You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/19 16:45:42 UTC
[arrow-datafusion] branch master updated: Show optimized physical
and logical plans in EXPLAIN (#744)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 3fb600d Show optimized physical and logical plans in EXPLAIN (#744)
3fb600d is described below
commit 3fb600df48ab1e53903b1a9bb12ebde33ad0856b
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon Jul 19 12:45:36 2021 -0400
Show optimized physical and logical plans in EXPLAIN (#744)
* Show optimized physical and logical plans in EXPLAIN
* rewrite tests
* reformat
---
datafusion/src/logical_plan/builder.rs | 18 +++++--
datafusion/src/logical_plan/plan.rs | 24 ++++++---
datafusion/src/optimizer/utils.rs | 4 +-
datafusion/src/physical_plan/explain.rs | 18 +++++--
datafusion/src/physical_plan/planner.rs | 94 +++++++++++++++++++++++----------
datafusion/src/sql/planner.rs | 4 +-
datafusion/tests/sql.rs | 45 ++++++++++++++--
datafusion/tests/user_defined_plan.rs | 6 +--
8 files changed, 158 insertions(+), 55 deletions(-)
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 85c4aea..0335e29 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -399,7 +399,7 @@ impl LogicalPlanBuilder {
/// Create an expression to represent the explanation of the plan
pub fn explain(&self, verbose: bool) -> Result<Self> {
let stringified_plans = vec![StringifiedPlan::new(
- PlanType::LogicalPlan,
+ PlanType::InitialLogicalPlan,
format!("{:#?}", self.plan.clone()),
)];
@@ -740,14 +740,24 @@ mod tests {
#[test]
fn stringified_plan() {
let stringified_plan =
- StringifiedPlan::new(PlanType::LogicalPlan, "...the plan...");
+ StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
+ assert!(stringified_plan.should_display(true));
+ assert!(!stringified_plan.should_display(false)); // not in non verbose mode
+
+ let stringified_plan =
+ StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
assert!(stringified_plan.should_display(true));
assert!(stringified_plan.should_display(false)); // display in non verbose mode too
let stringified_plan =
- StringifiedPlan::new(PlanType::PhysicalPlan, "...the plan...");
+ StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
assert!(stringified_plan.should_display(true));
- assert!(!stringified_plan.should_display(false));
+ assert!(!stringified_plan.should_display(false)); // not in non verbose mode
+
+ let stringified_plan =
+ StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
+ assert!(stringified_plan.should_display(true));
+ assert!(stringified_plan.should_display(false)); // display in non verbose mode
let stringified_plan = StringifiedPlan::new(
PlanType::OptimizedLogicalPlan {
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index 42eaf8e..9a4daae 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -805,28 +805,35 @@ impl fmt::Debug for LogicalPlan {
}
}
-/// Represents which type of plan
+/// Represents which type of plan, when storing multiple
+/// for use in EXPLAIN plans
#[derive(Debug, Clone, PartialEq)]
pub enum PlanType {
/// The initial LogicalPlan provided to DataFusion
- LogicalPlan,
+ InitialLogicalPlan,
/// The LogicalPlan which results from applying an optimizer pass
OptimizedLogicalPlan {
/// The name of the optimizer which produced this plan
optimizer_name: String,
},
- /// The physical plan, prepared for execution
- PhysicalPlan,
+ /// The final, fully optimized LogicalPlan that was converted to a physical plan
+ FinalLogicalPlan,
+ /// The initial physical plan, prepared for execution
+ InitialPhysicalPlan,
+ /// The final, fully optimized physical which would be executed
+ FinalPhysicalPlan,
}
impl fmt::Display for PlanType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
- PlanType::LogicalPlan => write!(f, "logical_plan"),
+ PlanType::InitialLogicalPlan => write!(f, "initial_logical_plan"),
PlanType::OptimizedLogicalPlan { optimizer_name } => {
write!(f, "logical_plan after {}", optimizer_name)
}
- PlanType::PhysicalPlan => write!(f, "physical_plan"),
+ PlanType::FinalLogicalPlan => write!(f, "logical_plan"),
+ PlanType::InitialPhysicalPlan => write!(f, "initial_physical_plan"),
+ PlanType::FinalPhysicalPlan => write!(f, "physical_plan"),
}
}
}
@@ -854,7 +861,10 @@ impl StringifiedPlan {
/// returns true if this plan should be displayed. Generally
/// `verbose_mode = true` will display all available plans
pub fn should_display(&self, verbose_mode: bool) -> bool {
- self.plan_type == PlanType::LogicalPlan || verbose_mode
+ match self.plan_type {
+ PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
+ _ => verbose_mode,
+ }
}
}
diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs
index 1d19f06..88380ea 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -542,7 +542,7 @@ mod tests {
&optimizer,
true,
&empty_plan,
- &[StringifiedPlan::new(PlanType::LogicalPlan, "...")],
+ &[StringifiedPlan::new(PlanType::InitialLogicalPlan, "...")],
schema.as_ref(),
&ExecutionProps::new(),
)?;
@@ -556,7 +556,7 @@ mod tests {
assert!(*verbose);
let expected_stringified_plans = vec![
- StringifiedPlan::new(PlanType::LogicalPlan, "..."),
+ StringifiedPlan::new(PlanType::InitialLogicalPlan, "..."),
StringifiedPlan::new(
PlanType::OptimizedLogicalPlan {
optimizer_name: "test_optimizer".into(),
diff --git a/datafusion/src/physical_plan/explain.rs b/datafusion/src/physical_plan/explain.rs
index c838ce4..195a7a5 100644
--- a/datafusion/src/physical_plan/explain.rs
+++ b/datafusion/src/physical_plan/explain.rs
@@ -40,14 +40,21 @@ pub struct ExplainExec {
schema: SchemaRef,
/// The strings to be printed
stringified_plans: Vec<StringifiedPlan>,
+ /// control which plans to print
+ verbose: bool,
}
impl ExplainExec {
/// Create a new ExplainExec
- pub fn new(schema: SchemaRef, stringified_plans: Vec<StringifiedPlan>) -> Self {
+ pub fn new(
+ schema: SchemaRef,
+ stringified_plans: Vec<StringifiedPlan>,
+ verbose: bool,
+ ) -> Self {
ExplainExec {
schema,
stringified_plans,
+ verbose,
}
}
@@ -103,8 +110,13 @@ impl ExecutionPlan for ExplainExec {
let mut type_builder = StringBuilder::new(self.stringified_plans.len());
let mut plan_builder = StringBuilder::new(self.stringified_plans.len());
- for p in &self.stringified_plans {
- type_builder.append_value(&p.plan_type.to_string())?;
+ let plans_to_print = self
+ .stringified_plans
+ .iter()
+ .filter(|s| s.should_display(self.verbose));
+
+ for p in plans_to_print {
+ type_builder.append_value(p.plan_type.to_string())?;
plan_builder.append_value(&*p.plan)?;
}
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index df41683..5163e4b 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -240,8 +240,13 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>> {
- let plan = self.create_initial_plan(logical_plan, ctx_state)?;
- self.optimize_plan(plan, ctx_state)
+ match self.handle_explain(logical_plan, ctx_state)? {
+ Some(plan) => Ok(plan),
+ None => {
+ let plan = self.create_initial_plan(logical_plan, ctx_state)?;
+ self.optimize_plan(plan, ctx_state)
+ }
+ }
}
/// Create a physical expression from a logical expression
@@ -280,7 +285,7 @@ impl DefaultPhysicalPlanner {
Self { extension_planners }
}
- /// Optimize a physical plan
+ /// Optimize a physical plan by applying each physical optimizer
fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
@@ -749,32 +754,9 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
- LogicalPlan::Explain {
- verbose,
- plan,
- stringified_plans,
- schema,
- } => {
- let input = self.create_initial_plan(plan, ctx_state)?;
-
- let mut stringified_plans = stringified_plans
- .iter()
- .filter(|s| s.should_display(*verbose))
- .cloned()
- .collect::<Vec<_>>();
-
- // add in the physical plan if requested
- if *verbose {
- stringified_plans.push(StringifiedPlan::new(
- PlanType::PhysicalPlan,
- displayable(input.as_ref()).indent().to_string(),
- ));
- }
- Ok(Arc::new(ExplainExec::new(
- SchemaRef::new(schema.as_ref().to_owned().into()),
- stringified_plans,
- )))
- }
+ LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
+ "Unsupported logical plan: Explain must be root of the plan".to_string(),
+ )),
LogicalPlan::Extension { node } => {
let physical_inputs = node
.inputs()
@@ -1315,6 +1297,60 @@ impl DefaultPhysicalPlanner {
options,
})
}
+
+ /// Handles capturing the various plans for EXPLAIN queries
+ ///
+ /// Returns
+ /// Some(plan) if optimized, and None if logical_plan was not an
+ /// explain (and thus needs to be optimized as normal)
+ fn handle_explain(
+ &self,
+ logical_plan: &LogicalPlan,
+ ctx_state: &ExecutionContextState,
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ if let LogicalPlan::Explain {
+ verbose,
+ plan,
+ stringified_plans,
+ schema,
+ } = logical_plan
+ {
+ let final_logical_plan = StringifiedPlan::new(
+ PlanType::FinalLogicalPlan,
+ plan.display_indent().to_string(),
+ );
+
+ let input = self.create_initial_plan(plan, ctx_state)?;
+
+ let initial_physical_plan = StringifiedPlan::new(
+ PlanType::InitialPhysicalPlan,
+ displayable(input.as_ref()).indent().to_string(),
+ );
+
+ let input = self.optimize_plan(input, ctx_state)?;
+
+ let final_physical_plan = StringifiedPlan::new(
+ PlanType::FinalPhysicalPlan,
+ displayable(input.as_ref()).indent().to_string(),
+ );
+
+ let stringified_plans = stringified_plans
+ .iter()
+ .cloned()
+ .chain(std::iter::once(final_logical_plan))
+ .chain(std::iter::once(initial_physical_plan))
+ .chain(std::iter::once(final_physical_plan))
+ .collect::<Vec<_>>();
+
+ Ok(Some(Arc::new(ExplainExec::new(
+ SchemaRef::new(schema.as_ref().to_owned().into()),
+ stringified_plans,
+ *verbose,
+ ))))
+ } else {
+ Ok(None)
+ }
+ }
}
fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 1437346..a4bb02c 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -234,8 +234,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = self.sql_statement_to_plan(statement)?;
let stringified_plans = vec![StringifiedPlan::new(
- PlanType::LogicalPlan,
- format!("{:#?}", plan),
+ PlanType::InitialLogicalPlan,
+ plan.display_indent().to_string(),
)];
let schema = LogicalPlan::explain_schema();
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 9c7d079..875e982 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -1972,17 +1972,28 @@ async fn csv_explain() {
register_aggregate_csv_by_sql(&mut ctx).await;
let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > 10";
let actual = execute(&mut ctx, sql).await;
- let expected = vec![vec![
- "logical_plan",
- "Projection: #aggregate_test_100.c1\
- \n Filter: #aggregate_test_100.c2 Gt Int64(10)\
- \n TableScan: aggregate_test_100 projection=None",
+ let actual = normalize_vec_for_explain(actual);
+ let expected = vec![
+ vec![
+ "logical_plan",
+ "Projection: #aggregate_test_100.c1\
+ \n Filter: #aggregate_test_100.c2 Gt Int64(10)\
+ \n TableScan: aggregate_test_100 projection=Some([0, 1])"
+ ],
+ vec!["physical_plan",
+ "ProjectionExec: expr=[c1@0 as c1]\
+ \n CoalesceBatchesExec: target_batch_size=4096\
+ \n FilterExec: CAST(c2@1 AS Int64) > 10\
+ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
+ \n CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true\
+ \n"
]];
assert_eq!(expected, actual);
// Also, expect same result with lowercase explain
let sql = "explain SELECT c1 FROM aggregate_test_100 where c2 > 10";
let actual = execute(&mut ctx, sql).await;
+ let actual = normalize_vec_for_explain(actual);
assert_eq!(expected, actual);
}
@@ -3921,3 +3932,27 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> {
assert_eq!(err.to_string(), "Error during planning: Invalid or wrong number of arguments passed to aggregate: 'COUNT(DISTINCT )'");
Ok(())
}
+
+// Normalizes parts of an explain plan that vary from run to run (such as path)
+fn normalize_for_explain(s: &str) -> String {
+ // Convert things like /Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv
+ // to ARROW_TEST_DATA/csv/aggregate_test_100.csv
+ let data_path = datafusion::test_util::arrow_test_data();
+ let s = s.replace(&data_path, "ARROW_TEST_DATA");
+
+ // convert things like partitioning=RoundRobinBatch(16)
+ // to partitioning=RoundRobinBatch(NUM_CORES)
+ let needle = format!("RoundRobinBatch({})", num_cpus::get());
+ s.replace(&needle, "RoundRobinBatch(NUM_CORES)")
+}
+
+/// Applies normalize_for_explain to every line
+fn normalize_vec_for_explain(v: Vec<Vec<String>>) -> Vec<Vec<String>> {
+ v.into_iter()
+ .map(|l| {
+ l.into_iter()
+ .map(|s| normalize_for_explain(&s))
+ .collect::<Vec<_>>()
+ })
+ .collect::<Vec<_>>()
+}
diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs
index 21b4963..e1f8c76 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -163,9 +163,9 @@ async fn topk_plan() -> Result<()> {
let mut ctx = setup_table(make_topk_context()).await?;
let expected = vec![
- "| logical_plan after topk | TopK: k=3 |",
- "| | Projection: #sales.customer_id, #sales.revenue |",
- "| | TableScan: sales projection=Some([0, 1]) |",
+ "| logical_plan after topk | TopK: k=3 |",
+ "| | Projection: #sales.customer_id, #sales.revenue |",
+ "| | TableScan: sales projection=Some([0, 1]) |",
].join("\n");
let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);