You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/07 19:34:35 UTC

[arrow-datafusion] branch master updated: enable explain for ballista (#2163)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6504d2a78 enable explain for ballista (#2163)
6504d2a78 is described below

commit 6504d2a781391df659f9be2ea9a24b4e8a0fdfee
Author: Jie Han <11...@users.noreply.github.com>
AuthorDate: Fri Apr 8 03:34:28 2022 +0800

    enable explain for ballista (#2163)
    
    * explain
    
    * fmt
---
 ballista/rust/core/proto/ballista.proto            |  9 ++-
 ballista/rust/core/proto/datafusion.proto          | 24 +++++++
 ballista/rust/core/src/serde/physical_plan/mod.rs  | 77 +++++++++++++++-------
 datafusion/core/src/optimizer/limit_push_down.rs   | 18 ++---
 .../core/src/optimizer/projection_push_down.rs     | 36 +++++-----
 datafusion/core/src/physical_plan/explain.rs       |  5 ++
 datafusion/proto/proto/datafusion.proto            | 24 +++++++
 datafusion/proto/src/from_proto.rs                 | 43 +++++++++++-
 datafusion/proto/src/to_proto.rs                   | 47 ++++++++++++-
 9 files changed, 225 insertions(+), 58 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 1c50db6fd..2f69d4a26 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -183,7 +183,7 @@ message AnalyzeNode {
   bool verbose = 2;
 }
 
-message ExplainNode{
+message ExplainNode {
   LogicalPlanNode input = 1;
   bool verbose = 2;
 }
@@ -269,6 +269,7 @@ message PhysicalPlanNode {
     AvroScanExecNode avro_scan = 20;
     PhysicalExtensionNode extension = 21;
     UnionExecNode union = 22;
+    ExplainExecNode explain = 23;
   }
 }
 
@@ -461,6 +462,12 @@ message UnionExecNode {
   repeated PhysicalPlanNode inputs = 1;
 }
 
+message ExplainExecNode {
+  datafusion.Schema schema = 1;
+  repeated datafusion.StringifiedPlan stringified_plans = 2;
+  bool verbose = 3;
+}
+
 message CrossJoinExecNode {
   PhysicalPlanNode left = 1;
   PhysicalPlanNode right = 2;
diff --git a/ballista/rust/core/proto/datafusion.proto b/ballista/rust/core/proto/datafusion.proto
index 6bf163a64..1dc9b34f7 100644
--- a/ballista/rust/core/proto/datafusion.proto
+++ b/ballista/rust/core/proto/datafusion.proto
@@ -515,3 +515,27 @@ message ArrowType{
 //   }
 //}
 message EmptyMessage{}
+
+message OptimizedLogicalPlanType {
+  string optimizer_name = 1;
+}
+
+message OptimizedPhysicalPlanType {
+  string optimizer_name = 1;
+}
+
+message PlanType {
+  oneof plan_type_enum {
+    EmptyMessage InitialLogicalPlan = 1;
+    OptimizedLogicalPlanType OptimizedLogicalPlan = 2;
+    EmptyMessage FinalLogicalPlan = 3;
+    EmptyMessage InitialPhysicalPlan = 4;
+    OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5;
+    EmptyMessage FinalPhysicalPlan = 6;
+  }
+}
+
+message StringifiedPlan {
+  PlanType plan_type = 1;
+  string plan = 2;
+}
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs
index 3abb0713d..32eda9f3a 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -15,24 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::error::BallistaError;
-use crate::execution_plans::{
-    ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
-};
-use crate::serde::physical_plan::from_proto::{
-    parse_physical_expr, parse_protobuf_hash_partitioning,
-};
-use crate::serde::protobuf::physical_expr_node::ExprType;
-use crate::serde::protobuf::physical_plan_node::PhysicalPlanType;
-use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
+use std::convert::TryInto;
+use std::sync::Arc;
+
+use prost::bytes::BufMut;
+use prost::Message;
 
-use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode};
-use crate::serde::scheduler::PartitionLocation;
-use crate::serde::{
-    byte_to_string, proto_error, protobuf, str_to_byte, AsExecutionPlan,
-    PhysicalExtensionCodec,
-};
-use crate::{convert_required, into_physical_plan, into_required};
 use datafusion::arrow::compute::SortOptions;
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
@@ -45,6 +33,7 @@ use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::cross_join::CrossJoinExec;
 use datafusion::physical_plan::empty::EmptyExec;
+use datafusion::physical_plan::explain::ExplainExec;
 use datafusion::physical_plan::expressions::{Column, PhysicalSortExpr};
 use datafusion::physical_plan::file_format::{
     AvroExec, CsvExec, FileScanConfig, ParquetExec,
@@ -62,10 +51,24 @@ use datafusion::physical_plan::{
     AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
 };
 use datafusion_proto::from_proto::parse_expr;
-use prost::bytes::BufMut;
-use prost::Message;
-use std::convert::TryInto;
-use std::sync::Arc;
+
+use crate::error::BallistaError;
+use crate::execution_plans::{
+    ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
+};
+use crate::serde::physical_plan::from_proto::{
+    parse_physical_expr, parse_protobuf_hash_partitioning,
+};
+use crate::serde::protobuf::physical_expr_node::ExprType;
+use crate::serde::protobuf::physical_plan_node::PhysicalPlanType;
+use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
+use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode};
+use crate::serde::scheduler::PartitionLocation;
+use crate::serde::{
+    byte_to_string, proto_error, protobuf, str_to_byte, AsExecutionPlan,
+    PhysicalExtensionCodec,
+};
+use crate::{convert_required, into_physical_plan, into_required};
 
 pub mod from_proto;
 pub mod to_proto;
@@ -103,6 +106,15 @@ impl AsExecutionPlan for PhysicalPlanNode {
             ))
         })?;
         match plan {
+            PhysicalPlanType::Explain(explain) => Ok(Arc::new(ExplainExec::new(
+                Arc::new(explain.schema.as_ref().unwrap().try_into()?),
+                explain
+                    .stringified_plans
+                    .iter()
+                    .map(|plan| plan.into())
+                    .collect(),
+                explain.verbose,
+            ))),
             PhysicalPlanType::Projection(projection) => {
                 let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
                     projection.input,
@@ -587,7 +599,21 @@ impl AsExecutionPlan for PhysicalPlanNode {
         let plan_clone = plan.clone();
         let plan = plan.as_any();
 
-        if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
+        if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
+            Ok(protobuf::PhysicalPlanNode {
+                physical_plan_type: Some(PhysicalPlanType::Explain(
+                    protobuf::ExplainExecNode {
+                        schema: Some(exec.schema().as_ref().into()),
+                        stringified_plans: exec
+                            .stringified_plans()
+                            .iter()
+                            .map(|plan| plan.into())
+                            .collect(),
+                        verbose: exec.verbose(),
+                    },
+                )),
+            })
+        } else if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
             let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
                 exec.input().to_owned(),
                 extension_codec,
@@ -1038,7 +1064,6 @@ mod roundtrip_tests {
     use std::ops::Deref;
     use std::sync::Arc;
 
-    use crate::serde::{AsExecutionPlan, BallistaCodec};
     use datafusion::arrow::array::ArrayRef;
     use datafusion::execution::context::ExecutionProps;
     use datafusion::logical_plan::create_udf;
@@ -1071,10 +1096,12 @@ mod roundtrip_tests {
         scalar::ScalarValue,
     };
 
-    use super::super::super::error::Result;
-    use super::super::protobuf;
     use crate::execution_plans::ShuffleWriterExec;
     use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+    use crate::serde::{AsExecutionPlan, BallistaCodec};
+
+    use super::super::super::error::Result;
+    use super::super::protobuf;
 
     fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
         let ctx = SessionContext::new();
diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs
index 4fa6e2786..0c68f1761 100644
--- a/datafusion/core/src/optimizer/limit_push_down.rs
+++ b/datafusion/core/src/optimizer/limit_push_down.rs
@@ -39,10 +39,10 @@ impl LimitPushDown {
 }
 
 fn limit_push_down(
-    optimizer: &LimitPushDown,
+    _optimizer: &LimitPushDown,
     upper_limit: Option<usize>,
     plan: &LogicalPlan,
-    execution_props: &ExecutionProps,
+    _execution_props: &ExecutionProps,
 ) -> Result<LogicalPlan> {
     match (plan, upper_limit) {
         (LogicalPlan::Limit(Limit { n, input }), upper_limit) => {
@@ -51,10 +51,10 @@ fn limit_push_down(
                 n: smallest,
                 // push down limit to plan (minimum of upper limit and current limit)
                 input: Arc::new(limit_push_down(
-                    optimizer,
+                    _optimizer,
                     Some(smallest),
                     input.as_ref(),
-                    execution_props,
+                    _execution_props,
                 )?),
             }))
         }
@@ -91,10 +91,10 @@ fn limit_push_down(
             Ok(LogicalPlan::Projection(Projection {
                 expr: expr.clone(),
                 input: Arc::new(limit_push_down(
-                    optimizer,
+                    _optimizer,
                     upper_limit,
                     input.as_ref(),
-                    execution_props,
+                    _execution_props,
                 )?),
                 schema: schema.clone(),
                 alias: alias.clone(),
@@ -115,10 +115,10 @@ fn limit_push_down(
                     Ok(LogicalPlan::Limit(Limit {
                         n: upper_limit,
                         input: Arc::new(limit_push_down(
-                            optimizer,
+                            _optimizer,
                             Some(upper_limit),
                             x,
-                            execution_props,
+                            _execution_props,
                         )?),
                     }))
                 })
@@ -138,7 +138,7 @@ fn limit_push_down(
             let inputs = plan.inputs();
             let new_inputs = inputs
                 .iter()
-                .map(|plan| limit_push_down(optimizer, None, plan, execution_props))
+                .map(|plan| limit_push_down(_optimizer, None, plan, _execution_props))
                 .collect::<Result<Vec<_>>>()?;
 
             utils::from_plan(plan, &expr, &new_inputs)
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs
index 7f43b5595..5b61ace11 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -126,11 +126,11 @@ fn get_projected_schema(
 
 /// Recursively transverses the logical plan removing expressions and that are not needed.
 fn optimize_plan(
-    optimizer: &ProjectionPushDown,
+    _optimizer: &ProjectionPushDown,
     plan: &LogicalPlan,
     required_columns: &HashSet<Column>, // set of columns required up to this step
     has_projection: bool,
-    execution_props: &ExecutionProps,
+    _execution_props: &ExecutionProps,
 ) -> Result<LogicalPlan> {
     let mut new_required_columns = required_columns.clone();
     match plan {
@@ -165,11 +165,11 @@ fn optimize_plan(
                 })?;
 
             let new_input = optimize_plan(
-                optimizer,
+                _optimizer,
                 input,
                 &new_required_columns,
                 true,
-                execution_props,
+                _execution_props,
             )?;
 
             let new_required_columns_optimized = new_input
@@ -211,19 +211,19 @@ fn optimize_plan(
             }
 
             let optimized_left = Arc::new(optimize_plan(
-                optimizer,
+                _optimizer,
                 left,
                 &new_required_columns,
                 true,
-                execution_props,
+                _execution_props,
             )?);
 
             let optimized_right = Arc::new(optimize_plan(
-                optimizer,
+                _optimizer,
                 right,
                 &new_required_columns,
                 true,
-                execution_props,
+                _execution_props,
             )?);
 
             let schema = build_join_schema(
@@ -272,11 +272,11 @@ fn optimize_plan(
             )?;
 
             LogicalPlanBuilder::from(optimize_plan(
-                optimizer,
+                _optimizer,
                 input,
                 &new_required_columns,
                 true,
-                execution_props,
+                _execution_props,
             )?)
             .window(new_window_expr)?
             .build()
@@ -324,11 +324,11 @@ fn optimize_plan(
                 group_expr: group_expr.clone(),
                 aggr_expr: new_aggr_expr,
                 input: Arc::new(optimize_plan(
-                    optimizer,
+                    _optimizer,
                     input,
                     &new_required_columns,
                     true,
-                    execution_props,
+                    _execution_props,
                 )?),
                 schema: DFSchemaRef::new(new_schema),
             }))
@@ -373,11 +373,11 @@ fn optimize_plan(
 
             Ok(LogicalPlan::Analyze(Analyze {
                 input: Arc::new(optimize_plan(
-                    optimizer,
+                    _optimizer,
                     &a.input,
                     &required_columns,
                     false,
-                    execution_props,
+                    _execution_props,
                 )?),
                 verbose: a.verbose,
                 schema: a.schema.clone(),
@@ -409,11 +409,11 @@ fn optimize_plan(
                             new_required_columns.insert(f.qualified_column());
                         });
                     optimize_plan(
-                        optimizer,
+                        _optimizer,
                         input_plan,
                         &new_required_columns,
                         has_projection,
-                        execution_props,
+                        _execution_props,
                     )
                 })
                 .collect::<Result<Vec<_>>>()?;
@@ -457,11 +457,11 @@ fn optimize_plan(
                 .iter()
                 .map(|input_plan| {
                     optimize_plan(
-                        optimizer,
+                        _optimizer,
                         input_plan,
                         &new_required_columns,
                         has_projection,
-                        execution_props,
+                        _execution_props,
                     )
                 })
                 .collect::<Result<Vec<_>>>()?;
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index d09eae3ae..fd5ff03ca 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -66,6 +66,11 @@ impl ExplainExec {
     pub fn stringified_plans(&self) -> &[StringifiedPlan] {
         &self.stringified_plans
     }
+
+    /// access to verbose
+    pub fn verbose(&self) -> bool {
+        self.verbose
+    }
 }
 
 #[async_trait]
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 1bb9aef3c..1e5a797ce 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -540,3 +540,27 @@ message ArrowType{
 //   }
 //}
 message EmptyMessage{}
+
+message OptimizedLogicalPlanType {
+  string optimizer_name = 1;
+}
+
+message OptimizedPhysicalPlanType {
+  string optimizer_name = 1;
+}
+
+message PlanType {
+  oneof plan_type_enum {
+    EmptyMessage InitialLogicalPlan = 1;
+    OptimizedLogicalPlanType OptimizedLogicalPlan = 2;
+    EmptyMessage FinalLogicalPlan = 3;
+    EmptyMessage InitialPhysicalPlan = 4;
+    OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5;
+    EmptyMessage FinalPhysicalPlan = 6;
+  }
+}
+
+message StringifiedPlan {
+  PlanType plan_type = 1;
+  string plan = 2;
+}
diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs
index c98fc824b..0aa24dd8e 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -16,7 +16,13 @@
 // under the License.
 
 use crate::protobuf;
-use datafusion::logical_plan::FunctionRegistry;
+use crate::protobuf::plan_type::PlanTypeEnum::{
+    FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
+    OptimizedLogicalPlan, OptimizedPhysicalPlan,
+};
+use crate::protobuf::{OptimizedLogicalPlanType, OptimizedPhysicalPlanType};
+use datafusion::logical_plan::plan::StringifiedPlan;
+use datafusion::logical_plan::{FunctionRegistry, PlanType};
 use datafusion::prelude::bit_length;
 use datafusion::{
     arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode},
@@ -363,6 +369,37 @@ impl TryFrom<&protobuf::Field> for Field {
     }
 }
 
+impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
+    fn from(stringified_plan: &protobuf::StringifiedPlan) -> Self {
+        Self {
+            plan_type: match stringified_plan
+                .plan_type
+                .as_ref()
+                .unwrap()
+                .plan_type_enum
+                .as_ref()
+                .unwrap()
+            {
+                InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
+                OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
+                    PlanType::OptimizedLogicalPlan {
+                        optimizer_name: optimizer_name.clone(),
+                    }
+                }
+                FinalLogicalPlan(_) => PlanType::FinalLogicalPlan,
+                InitialPhysicalPlan(_) => PlanType::InitialPhysicalPlan,
+                OptimizedPhysicalPlan(OptimizedPhysicalPlanType { optimizer_name }) => {
+                    PlanType::OptimizedPhysicalPlan {
+                        optimizer_name: optimizer_name.clone(),
+                    }
+                }
+                FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
+            },
+            plan: Arc::new(stringified_plan.plan.clone()),
+        }
+    }
+}
+
 impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
     fn from(f: &protobuf::ScalarFunction) -> Self {
         use protobuf::ScalarFunction;
@@ -721,7 +758,7 @@ impl TryFrom<&protobuf::PrimitiveScalarType> for ScalarValue {
 
         Ok(match scalar {
             PrimitiveScalarType::Null => {
-                return Err(proto_error("Untyped null is an invalid scalar value"))
+                return Err(proto_error("Untyped null is an invalid scalar value"));
             }
             PrimitiveScalarType::Bool => Self::Boolean(None),
             PrimitiveScalarType::Uint8 => Self::UInt8(None),
@@ -1450,7 +1487,7 @@ fn typechecked_scalar_value_conversion(
                     PrimitiveScalarType::Null => {
                         return Err(proto_error(
                             "Untyped scalar null is not a valid scalar value",
-                        ))
+                        ));
                     }
                     PrimitiveScalarType::Decimal128 => {
                         ScalarValue::Decimal128(None, 0, 0)
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index 332cbbde7..1e05f8047 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -20,7 +20,16 @@
 //! processes.
 
 use crate::protobuf;
+use crate::protobuf::plan_type::PlanTypeEnum::{
+    FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
+    OptimizedLogicalPlan, OptimizedPhysicalPlan,
+};
+use crate::protobuf::{
+    EmptyMessage, OptimizedLogicalPlanType, OptimizedPhysicalPlanType,
+};
 
+use datafusion::logical_plan::plan::StringifiedPlan;
+use datafusion::logical_plan::PlanType;
 use datafusion::{
     arrow::datatypes::{
         DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode,
@@ -143,8 +152,6 @@ impl From<&DataType> for protobuf::ArrowType {
 
 impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
     fn from(val: &DataType) -> Self {
-        use protobuf::EmptyMessage;
-
         match val {
             DataType::Null => Self::None(EmptyMessage {}),
             DataType::Boolean => Self::Bool(EmptyMessage {}),
@@ -294,6 +301,42 @@ impl From<&DFSchemaRef> for protobuf::DfSchema {
     }
 }
 
+impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
+    fn from(stringified_plan: &StringifiedPlan) -> Self {
+        Self {
+            plan_type: match stringified_plan.clone().plan_type {
+                PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
+                    plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
+                }),
+                PlanType::OptimizedLogicalPlan { optimizer_name } => {
+                    Some(protobuf::PlanType {
+                        plan_type_enum: Some(OptimizedLogicalPlan(
+                            OptimizedLogicalPlanType { optimizer_name },
+                        )),
+                    })
+                }
+                PlanType::FinalLogicalPlan => Some(protobuf::PlanType {
+                    plan_type_enum: Some(FinalLogicalPlan(EmptyMessage {})),
+                }),
+                PlanType::InitialPhysicalPlan => Some(protobuf::PlanType {
+                    plan_type_enum: Some(InitialPhysicalPlan(EmptyMessage {})),
+                }),
+                PlanType::OptimizedPhysicalPlan { optimizer_name } => {
+                    Some(protobuf::PlanType {
+                        plan_type_enum: Some(OptimizedPhysicalPlan(
+                            OptimizedPhysicalPlanType { optimizer_name },
+                        )),
+                    })
+                }
+                PlanType::FinalPhysicalPlan => Some(protobuf::PlanType {
+                    plan_type_enum: Some(FinalPhysicalPlan(EmptyMessage {})),
+                }),
+            },
+            plan: stringified_plan.plan.to_string(),
+        }
+    }
+}
+
 impl From<&AggregateFunction> for protobuf::AggregateFunction {
     fn from(value: &AggregateFunction) -> Self {
         match value {