You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/19 16:45:42 UTC

[arrow-datafusion] branch master updated: Show optimized physical and logical plans in EXPLAIN (#744)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fb600d  Show optimized physical and logical plans in EXPLAIN (#744)
3fb600d is described below

commit 3fb600df48ab1e53903b1a9bb12ebde33ad0856b
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon Jul 19 12:45:36 2021 -0400

    Show optimized physical and logical plans in EXPLAIN (#744)
    
    * Show optimized physical and logical plans in EXPLAIN
    
    * rewrite tests
    
    * reformat
---
 datafusion/src/logical_plan/builder.rs  | 18 +++++--
 datafusion/src/logical_plan/plan.rs     | 24 ++++++---
 datafusion/src/optimizer/utils.rs       |  4 +-
 datafusion/src/physical_plan/explain.rs | 18 +++++--
 datafusion/src/physical_plan/planner.rs | 94 +++++++++++++++++++++++----------
 datafusion/src/sql/planner.rs           |  4 +-
 datafusion/tests/sql.rs                 | 45 ++++++++++++++--
 datafusion/tests/user_defined_plan.rs   |  6 +--
 8 files changed, 158 insertions(+), 55 deletions(-)

diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 85c4aea..0335e29 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -399,7 +399,7 @@ impl LogicalPlanBuilder {
     /// Create an expression to represent the explanation of the plan
     pub fn explain(&self, verbose: bool) -> Result<Self> {
         let stringified_plans = vec![StringifiedPlan::new(
-            PlanType::LogicalPlan,
+            PlanType::InitialLogicalPlan,
             format!("{:#?}", self.plan.clone()),
         )];
 
@@ -740,14 +740,24 @@ mod tests {
     #[test]
     fn stringified_plan() {
         let stringified_plan =
-            StringifiedPlan::new(PlanType::LogicalPlan, "...the plan...");
+            StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
+        assert!(stringified_plan.should_display(true));
+        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
+
+        let stringified_plan =
+            StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
         assert!(stringified_plan.should_display(true));
         assert!(stringified_plan.should_display(false)); // display in non verbose mode too
 
         let stringified_plan =
-            StringifiedPlan::new(PlanType::PhysicalPlan, "...the plan...");
+            StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
         assert!(stringified_plan.should_display(true));
-        assert!(!stringified_plan.should_display(false));
+        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
+
+        let stringified_plan =
+            StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
+        assert!(stringified_plan.should_display(true));
+        assert!(stringified_plan.should_display(false)); // display in non verbose mode
 
         let stringified_plan = StringifiedPlan::new(
             PlanType::OptimizedLogicalPlan {
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index 42eaf8e..9a4daae 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -805,28 +805,35 @@ impl fmt::Debug for LogicalPlan {
     }
 }
 
-/// Represents which type of plan
+/// Represents which type of plan, when storing multiple
+/// for use in EXPLAIN plans
 #[derive(Debug, Clone, PartialEq)]
 pub enum PlanType {
     /// The initial LogicalPlan provided to DataFusion
-    LogicalPlan,
+    InitialLogicalPlan,
     /// The LogicalPlan which results from applying an optimizer pass
     OptimizedLogicalPlan {
         /// The name of the optimizer which produced this plan
         optimizer_name: String,
     },
-    /// The physical plan, prepared for execution
-    PhysicalPlan,
+    /// The final, fully optimized LogicalPlan that was converted to a physical plan
+    FinalLogicalPlan,
+    /// The initial physical plan, prepared for execution
+    InitialPhysicalPlan,
+    /// The final, fully optimized physical which would be executed
+    FinalPhysicalPlan,
 }
 
 impl fmt::Display for PlanType {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         match self {
-            PlanType::LogicalPlan => write!(f, "logical_plan"),
+            PlanType::InitialLogicalPlan => write!(f, "initial_logical_plan"),
             PlanType::OptimizedLogicalPlan { optimizer_name } => {
                 write!(f, "logical_plan after {}", optimizer_name)
             }
-            PlanType::PhysicalPlan => write!(f, "physical_plan"),
+            PlanType::FinalLogicalPlan => write!(f, "logical_plan"),
+            PlanType::InitialPhysicalPlan => write!(f, "initial_physical_plan"),
+            PlanType::FinalPhysicalPlan => write!(f, "physical_plan"),
         }
     }
 }
@@ -854,7 +861,10 @@ impl StringifiedPlan {
     /// returns true if this plan should be displayed. Generally
     /// `verbose_mode = true` will display all available plans
     pub fn should_display(&self, verbose_mode: bool) -> bool {
-        self.plan_type == PlanType::LogicalPlan || verbose_mode
+        match self.plan_type {
+            PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
+            _ => verbose_mode,
+        }
     }
 }
 
diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs
index 1d19f06..88380ea 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -542,7 +542,7 @@ mod tests {
             &optimizer,
             true,
             &empty_plan,
-            &[StringifiedPlan::new(PlanType::LogicalPlan, "...")],
+            &[StringifiedPlan::new(PlanType::InitialLogicalPlan, "...")],
             schema.as_ref(),
             &ExecutionProps::new(),
         )?;
@@ -556,7 +556,7 @@ mod tests {
                 assert!(*verbose);
 
                 let expected_stringified_plans = vec![
-                    StringifiedPlan::new(PlanType::LogicalPlan, "..."),
+                    StringifiedPlan::new(PlanType::InitialLogicalPlan, "..."),
                     StringifiedPlan::new(
                         PlanType::OptimizedLogicalPlan {
                             optimizer_name: "test_optimizer".into(),
diff --git a/datafusion/src/physical_plan/explain.rs b/datafusion/src/physical_plan/explain.rs
index c838ce4..195a7a5 100644
--- a/datafusion/src/physical_plan/explain.rs
+++ b/datafusion/src/physical_plan/explain.rs
@@ -40,14 +40,21 @@ pub struct ExplainExec {
     schema: SchemaRef,
     /// The strings to be printed
     stringified_plans: Vec<StringifiedPlan>,
+    /// control which plans to print
+    verbose: bool,
 }
 
 impl ExplainExec {
     /// Create a new ExplainExec
-    pub fn new(schema: SchemaRef, stringified_plans: Vec<StringifiedPlan>) -> Self {
+    pub fn new(
+        schema: SchemaRef,
+        stringified_plans: Vec<StringifiedPlan>,
+        verbose: bool,
+    ) -> Self {
         ExplainExec {
             schema,
             stringified_plans,
+            verbose,
         }
     }
 
@@ -103,8 +110,13 @@ impl ExecutionPlan for ExplainExec {
         let mut type_builder = StringBuilder::new(self.stringified_plans.len());
         let mut plan_builder = StringBuilder::new(self.stringified_plans.len());
 
-        for p in &self.stringified_plans {
-            type_builder.append_value(&p.plan_type.to_string())?;
+        let plans_to_print = self
+            .stringified_plans
+            .iter()
+            .filter(|s| s.should_display(self.verbose));
+
+        for p in plans_to_print {
+            type_builder.append_value(p.plan_type.to_string())?;
             plan_builder.append_value(&*p.plan)?;
         }
 
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index df41683..5163e4b 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -240,8 +240,13 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
         logical_plan: &LogicalPlan,
         ctx_state: &ExecutionContextState,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let plan = self.create_initial_plan(logical_plan, ctx_state)?;
-        self.optimize_plan(plan, ctx_state)
+        match self.handle_explain(logical_plan, ctx_state)? {
+            Some(plan) => Ok(plan),
+            None => {
+                let plan = self.create_initial_plan(logical_plan, ctx_state)?;
+                self.optimize_plan(plan, ctx_state)
+            }
+        }
     }
 
     /// Create a physical expression from a logical expression
@@ -280,7 +285,7 @@ impl DefaultPhysicalPlanner {
         Self { extension_planners }
     }
 
-    /// Optimize a physical plan
+    /// Optimize a physical plan by applying each physical optimizer
     fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
@@ -749,32 +754,9 @@ impl DefaultPhysicalPlanner {
                     "Unsupported logical plan: CreateExternalTable".to_string(),
                 ))
             }
-            LogicalPlan::Explain {
-                verbose,
-                plan,
-                stringified_plans,
-                schema,
-            } => {
-                let input = self.create_initial_plan(plan, ctx_state)?;
-
-                let mut stringified_plans = stringified_plans
-                    .iter()
-                    .filter(|s| s.should_display(*verbose))
-                    .cloned()
-                    .collect::<Vec<_>>();
-
-                // add in the physical plan if requested
-                if *verbose {
-                    stringified_plans.push(StringifiedPlan::new(
-                        PlanType::PhysicalPlan,
-                        displayable(input.as_ref()).indent().to_string(),
-                    ));
-                }
-                Ok(Arc::new(ExplainExec::new(
-                    SchemaRef::new(schema.as_ref().to_owned().into()),
-                    stringified_plans,
-                )))
-            }
+            LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
+                "Unsupported logical plan: Explain must be root of the plan".to_string(),
+            )),
             LogicalPlan::Extension { node } => {
                 let physical_inputs = node
                     .inputs()
@@ -1315,6 +1297,60 @@ impl DefaultPhysicalPlanner {
             options,
         })
     }
+
+    /// Handles capturing the various plans for EXPLAIN queries
+    ///
+    /// Returns
+    /// Some(plan) if optimized, and None if logical_plan was not an
+    /// explain (and thus needs to be optimized as normal)
+    fn handle_explain(
+        &self,
+        logical_plan: &LogicalPlan,
+        ctx_state: &ExecutionContextState,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        if let LogicalPlan::Explain {
+            verbose,
+            plan,
+            stringified_plans,
+            schema,
+        } = logical_plan
+        {
+            let final_logical_plan = StringifiedPlan::new(
+                PlanType::FinalLogicalPlan,
+                plan.display_indent().to_string(),
+            );
+
+            let input = self.create_initial_plan(plan, ctx_state)?;
+
+            let initial_physical_plan = StringifiedPlan::new(
+                PlanType::InitialPhysicalPlan,
+                displayable(input.as_ref()).indent().to_string(),
+            );
+
+            let input = self.optimize_plan(input, ctx_state)?;
+
+            let final_physical_plan = StringifiedPlan::new(
+                PlanType::FinalPhysicalPlan,
+                displayable(input.as_ref()).indent().to_string(),
+            );
+
+            let stringified_plans = stringified_plans
+                .iter()
+                .cloned()
+                .chain(std::iter::once(final_logical_plan))
+                .chain(std::iter::once(initial_physical_plan))
+                .chain(std::iter::once(final_physical_plan))
+                .collect::<Vec<_>>();
+
+            Ok(Some(Arc::new(ExplainExec::new(
+                SchemaRef::new(schema.as_ref().to_owned().into()),
+                stringified_plans,
+                *verbose,
+            ))))
+        } else {
+            Ok(None)
+        }
+    }
 }
 
 fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 1437346..a4bb02c 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -234,8 +234,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let plan = self.sql_statement_to_plan(statement)?;
 
         let stringified_plans = vec![StringifiedPlan::new(
-            PlanType::LogicalPlan,
-            format!("{:#?}", plan),
+            PlanType::InitialLogicalPlan,
+            plan.display_indent().to_string(),
         )];
 
         let schema = LogicalPlan::explain_schema();
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 9c7d079..875e982 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -1972,17 +1972,28 @@ async fn csv_explain() {
     register_aggregate_csv_by_sql(&mut ctx).await;
     let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > 10";
     let actual = execute(&mut ctx, sql).await;
-    let expected = vec![vec![
-        "logical_plan",
-        "Projection: #aggregate_test_100.c1\
-            \n  Filter: #aggregate_test_100.c2 Gt Int64(10)\
-            \n    TableScan: aggregate_test_100 projection=None",
+    let actual = normalize_vec_for_explain(actual);
+    let expected = vec![
+        vec![
+            "logical_plan",
+            "Projection: #aggregate_test_100.c1\
+             \n  Filter: #aggregate_test_100.c2 Gt Int64(10)\
+             \n    TableScan: aggregate_test_100 projection=Some([0, 1])"
+        ],
+        vec!["physical_plan",
+             "ProjectionExec: expr=[c1@0 as c1]\
+              \n  CoalesceBatchesExec: target_batch_size=4096\
+              \n    FilterExec: CAST(c2@1 AS Int64) > 10\
+              \n      RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
+              \n        CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true\
+              \n"
     ]];
     assert_eq!(expected, actual);
 
     // Also, expect same result with lowercase explain
     let sql = "explain SELECT c1 FROM aggregate_test_100 where c2 > 10";
     let actual = execute(&mut ctx, sql).await;
+    let actual = normalize_vec_for_explain(actual);
     assert_eq!(expected, actual);
 }
 
@@ -3921,3 +3932,27 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> {
     assert_eq!(err.to_string(), "Error during planning: Invalid or wrong number of arguments passed to aggregate: 'COUNT(DISTINCT )'");
     Ok(())
 }
+
+// Normalizes parts of an explain plan that vary from run to run (such as path)
+fn normalize_for_explain(s: &str) -> String {
+    // Convert things like /Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv
+    // to ARROW_TEST_DATA/csv/aggregate_test_100.csv
+    let data_path = datafusion::test_util::arrow_test_data();
+    let s = s.replace(&data_path, "ARROW_TEST_DATA");
+
+    // convert things like partitioning=RoundRobinBatch(16)
+    // to partitioning=RoundRobinBatch(NUM_CORES)
+    let needle = format!("RoundRobinBatch({})", num_cpus::get());
+    s.replace(&needle, "RoundRobinBatch(NUM_CORES)")
+}
+
+/// Applies normalize_for_explain to every line
+fn normalize_vec_for_explain(v: Vec<Vec<String>>) -> Vec<Vec<String>> {
+    v.into_iter()
+        .map(|l| {
+            l.into_iter()
+                .map(|s| normalize_for_explain(&s))
+                .collect::<Vec<_>>()
+        })
+        .collect::<Vec<_>>()
+}
diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs
index 21b4963..e1f8c76 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -163,9 +163,9 @@ async fn topk_plan() -> Result<()> {
     let mut ctx = setup_table(make_topk_context()).await?;
 
     let expected = vec![
-        "| logical_plan after topk                 | TopK: k=3                                                                            |",
-        "|                                         |   Projection: #sales.customer_id, #sales.revenue                                     |",
-        "|                                         |     TableScan: sales projection=Some([0, 1])                                         |",
+        "| logical_plan after topk                 | TopK: k=3                                                                                |",
+        "|                                         |   Projection: #sales.customer_id, #sales.revenue                                         |",
+        "|                                         |     TableScan: sales projection=Some([0, 1])                                             |",
     ].join("\n");
 
     let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);