You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/09/08 10:23:45 UTC

[arrow-datafusion] branch main updated: feat: explain with statistics (#7459)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 73d6d5f50a feat: explain with statistics (#7459)
73d6d5f50a is described below

commit 73d6d5f50a73e43185401011bd6b53e8d1d345ec
Author: Eduard Karacharov <13...@users.noreply.github.com>
AuthorDate: Fri Sep 8 13:23:40 2023 +0300

    feat: explain with statistics (#7459)
    
    * explain with statistics
    
    * sqllogictest for parquet with statistics
---
 datafusion/common/src/config.rs                    |  4 ++
 datafusion/common/src/stats.rs                     | 21 +++++++++
 datafusion/core/src/physical_plan/analyze.rs       | 18 +++++++-
 datafusion/core/src/physical_plan/display.rs       | 54 ++++++++++++++++++++--
 datafusion/core/src/physical_planner.rs            | 34 +++++++++++++-
 datafusion/core/tests/sql/explain_analyze.rs       | 18 ++++++++
 datafusion/sqllogictest/test_files/explain.slt     | 36 +++++++++++++++
 .../sqllogictest/test_files/information_schema.slt |  1 +
 docs/source/user-guide/configs.md                  |  1 +
 9 files changed, 179 insertions(+), 8 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 5a32349c65..621cb9b64d 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -473,6 +473,10 @@ config_namespace! {
 
         /// When set to true, the explain statement will only print physical plans
         pub physical_plan_only: bool, default = false
+
+        /// When set to true, the explain statement will print operator statistics
+        /// for physical plans
+        pub show_statistics: bool, default = false
     }
 }
 
diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs
index d0f150a316..db788efef7 100644
--- a/datafusion/common/src/stats.rs
+++ b/datafusion/common/src/stats.rs
@@ -17,6 +17,8 @@
 
 //! This module provides data structures to represent statistics
 
+use std::fmt::Display;
+
 use crate::ScalarValue;
 
 /// Statistics for a relation
@@ -37,6 +39,25 @@ pub struct Statistics {
     pub is_exact: bool,
 }
 
+impl Display for Statistics {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        if self.num_rows.is_none() && self.total_byte_size.is_none() && !self.is_exact {
+            return Ok(());
+        }
+
+        let rows = self
+            .num_rows
+            .map_or_else(|| "None".to_string(), |v| v.to_string());
+        let bytes = self
+            .total_byte_size
+            .map_or_else(|| "None".to_string(), |v| v.to_string());
+
+        write!(f, "rows={}, bytes={}, exact={}", rows, bytes, self.is_exact)?;
+
+        Ok(())
+    }
+}
+
 /// Statistics for a column within a relation
 #[derive(Clone, Debug, Default, PartialEq, Eq)]
 pub struct ColumnStatistics {
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index 0e6edc6182..98fce19a1d 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -39,6 +39,8 @@ use datafusion_execution::TaskContext;
 pub struct AnalyzeExec {
     /// control how much extra to print
     verbose: bool,
+    /// if statistics should be displayed
+    show_statistics: bool,
     /// The input plan (the plan being analyzed)
     pub(crate) input: Arc<dyn ExecutionPlan>,
     /// The output schema for RecordBatches of this exec node
@@ -47,9 +49,15 @@ pub struct AnalyzeExec {
 
 impl AnalyzeExec {
     /// Create a new AnalyzeExec
-    pub fn new(verbose: bool, input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
+    pub fn new(
+        verbose: bool,
+        show_statistics: bool,
+        input: Arc<dyn ExecutionPlan>,
+        schema: SchemaRef,
+    ) -> Self {
         AnalyzeExec {
             verbose,
+            show_statistics,
             input,
             schema,
         }
@@ -111,6 +119,7 @@ impl ExecutionPlan for AnalyzeExec {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         Ok(Arc::new(Self::new(
             self.verbose,
+            self.show_statistics,
             children.pop().unwrap(),
             self.schema.clone(),
         )))
@@ -143,6 +152,7 @@ impl ExecutionPlan for AnalyzeExec {
         let captured_input = self.input.clone();
         let captured_schema = self.schema.clone();
         let verbose = self.verbose;
+        let show_statistics = self.show_statistics;
 
         // future that gathers the results from all the tasks in the
         // JoinSet that computes the overall row count and final
@@ -157,6 +167,7 @@ impl ExecutionPlan for AnalyzeExec {
             let duration = Instant::now() - start;
             create_output_batch(
                 verbose,
+                show_statistics,
                 total_rows,
                 duration,
                 captured_input,
@@ -179,6 +190,7 @@ impl ExecutionPlan for AnalyzeExec {
 /// Creates the ouput of AnalyzeExec as a RecordBatch
 fn create_output_batch(
     verbose: bool,
+    show_statistics: bool,
     total_rows: usize,
     duration: std::time::Duration,
     input: Arc<dyn ExecutionPlan>,
@@ -191,6 +203,7 @@ fn create_output_batch(
     type_builder.append_value("Plan with Metrics");
 
     let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
+        .set_show_statistics(show_statistics)
         .indent(verbose)
         .to_string();
     plan_builder.append_value(annotated_plan);
@@ -201,6 +214,7 @@ fn create_output_batch(
         type_builder.append_value("Plan with Full Metrics");
 
         let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
+            .set_show_statistics(show_statistics)
             .indent(verbose)
             .to_string();
         plan_builder.append_value(annotated_plan);
@@ -245,7 +259,7 @@ mod tests {
 
         let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
         let refs = blocking_exec.refs();
-        let analyze_exec = Arc::new(AnalyzeExec::new(true, blocking_exec, schema));
+        let analyze_exec = Arc::new(AnalyzeExec::new(true, false, blocking_exec, schema));
 
         let fut = collect(analyze_exec, task_ctx);
         let mut fut = fut.boxed();
diff --git a/datafusion/core/src/physical_plan/display.rs b/datafusion/core/src/physical_plan/display.rs
index 15109cba95..3b345bdf9e 100644
--- a/datafusion/core/src/physical_plan/display.rs
+++ b/datafusion/core/src/physical_plan/display.rs
@@ -42,38 +42,49 @@ pub struct DisplayableExecutionPlan<'a> {
     inner: &'a dyn ExecutionPlan,
     /// How to show metrics
     show_metrics: ShowMetrics,
+    /// If statistics should be displayed
+    show_statistics: bool,
 }
 
 impl<'a> DisplayableExecutionPlan<'a> {
-    /// Create a wrapper around an [`'ExecutionPlan'] which can be
+    /// Create a wrapper around an [`ExecutionPlan`] which can be
     /// pretty printed in a variety of ways
     pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
         Self {
             inner,
             show_metrics: ShowMetrics::None,
+            show_statistics: false,
         }
     }
 
-    /// Create a wrapper around an [`'ExecutionPlan'] which can be
+    /// Create a wrapper around an [`ExecutionPlan`] which can be
     /// pretty printed in a variety of ways that also shows aggregated
     /// metrics
     pub fn with_metrics(inner: &'a dyn ExecutionPlan) -> Self {
         Self {
             inner,
             show_metrics: ShowMetrics::Aggregated,
+            show_statistics: false,
         }
     }
 
-    /// Create a wrapper around an [`'ExecutionPlan'] which can be
+    /// Create a wrapper around an [`ExecutionPlan`] which can be
     /// pretty printed in a variety of ways that also shows all low
     /// level metrics
     pub fn with_full_metrics(inner: &'a dyn ExecutionPlan) -> Self {
         Self {
             inner,
             show_metrics: ShowMetrics::Full,
+            show_statistics: false,
         }
     }
 
+    /// Enable display of statistics
+    pub fn set_show_statistics(mut self, show_statistics: bool) -> Self {
+        self.show_statistics = show_statistics;
+        self
+    }
+
     /// Return a `format`able structure that produces a single line
     /// per node.
     ///
@@ -94,6 +105,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
             format_type: DisplayFormatType,
             plan: &'a dyn ExecutionPlan,
             show_metrics: ShowMetrics,
+            show_statistics: bool,
         }
         impl<'a> fmt::Display for Wrapper<'a> {
             fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -102,6 +114,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
                     f,
                     indent: 0,
                     show_metrics: self.show_metrics,
+                    show_statistics: self.show_statistics,
                 };
                 accept(self.plan, &mut visitor)
             }
@@ -110,6 +123,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
             format_type,
             plan: self.inner,
             show_metrics: self.show_metrics,
+            show_statistics: self.show_statistics,
         }
     }
 
@@ -128,6 +142,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
         struct Wrapper<'a> {
             plan: &'a dyn ExecutionPlan,
             show_metrics: ShowMetrics,
+            show_statistics: bool,
         }
         impl<'a> fmt::Display for Wrapper<'a> {
             fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -137,6 +152,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
                     f,
                     t,
                     show_metrics: self.show_metrics,
+                    show_statistics: self.show_statistics,
                     graphviz_builder: GraphvizBuilder::default(),
                     parents: Vec::new(),
                 };
@@ -153,6 +169,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
         Wrapper {
             plan: self.inner,
             show_metrics: self.show_metrics,
+            show_statistics: self.show_statistics,
         }
     }
 
@@ -162,6 +179,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
         struct Wrapper<'a> {
             plan: &'a dyn ExecutionPlan,
             show_metrics: ShowMetrics,
+            show_statistics: bool,
         }
 
         impl<'a> fmt::Display for Wrapper<'a> {
@@ -171,6 +189,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
                     t: DisplayFormatType::Default,
                     indent: 0,
                     show_metrics: self.show_metrics,
+                    show_statistics: self.show_statistics,
                 };
                 visitor.pre_visit(self.plan)?;
                 Ok(())
@@ -180,6 +199,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
         Wrapper {
             plan: self.inner,
             show_metrics: self.show_metrics,
+            show_statistics: self.show_statistics,
         }
     }
 
@@ -215,6 +235,8 @@ struct IndentVisitor<'a, 'b> {
     indent: usize,
     /// How to show metrics
     show_metrics: ShowMetrics,
+    /// If statistics should be displayed
+    show_statistics: bool,
 }
 
 impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
@@ -244,6 +266,9 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
                 }
             }
         }
+        if self.show_statistics {
+            write!(self.f, ", statistics=[{}]", plan.statistics())?;
+        }
         writeln!(self.f)?;
         self.indent += 1;
         Ok(true)
@@ -261,6 +286,9 @@ struct GraphvizVisitor<'a, 'b> {
     t: DisplayFormatType,
     /// How to show metrics
     show_metrics: ShowMetrics,
+    /// If statistics should be displayed
+    show_statistics: bool,
+
     graphviz_builder: GraphvizBuilder,
     /// Used to record parent node ids when visiting a plan.
     parents: Vec<usize>,
@@ -318,8 +346,24 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> {
             }
         };
 
-        self.graphviz_builder
-            .add_node(self.f, id, &label, Some(&metrics))?;
+        let statistics = if self.show_statistics {
+            format!("statistics=[{}]", plan.statistics())
+        } else {
+            "".to_string()
+        };
+
+        let delimiter = if !metrics.is_empty() && !statistics.is_empty() {
+            ", "
+        } else {
+            ""
+        };
+
+        self.graphviz_builder.add_node(
+            self.f,
+            id,
+            &label,
+            Some(&format!("{}{}{}", metrics, delimiter, statistics)),
+        )?;
 
         if let Some(parent_node_id) = self.parents.last() {
             self.graphviz_builder
diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index 3a59f40ede..35f5f96481 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1892,6 +1892,7 @@ impl DefaultPhysicalPlanner {
                     Ok(input) => {
                         stringified_plans.push(
                             displayable(input.as_ref())
+                                .set_show_statistics(config.show_statistics)
                                 .to_stringified(e.verbose, InitialPhysicalPlan),
                         );
 
@@ -1903,12 +1904,14 @@ impl DefaultPhysicalPlanner {
                                 let plan_type = OptimizedPhysicalPlan { optimizer_name };
                                 stringified_plans.push(
                                     displayable(plan)
+                                        .set_show_statistics(config.show_statistics)
                                         .to_stringified(e.verbose, plan_type),
                                 );
                             },
                         ) {
                             Ok(input) => stringified_plans.push(
                                 displayable(input.as_ref())
+                                    .set_show_statistics(config.show_statistics)
                                     .to_stringified(e.verbose, FinalPhysicalPlan),
                             ),
                             Err(DataFusionError::Context(optimizer_name, e)) => {
@@ -1932,7 +1935,13 @@ impl DefaultPhysicalPlanner {
         } else if let LogicalPlan::Analyze(a) = logical_plan {
             let input = self.create_physical_plan(&a.input, session_state).await?;
             let schema = SchemaRef::new((*a.schema).clone().into());
-            Ok(Some(Arc::new(AnalyzeExec::new(a.verbose, input, schema))))
+            let show_statistics = session_state.config_options().explain.show_statistics;
+            Ok(Some(Arc::new(AnalyzeExec::new(
+                a.verbose,
+                show_statistics,
+                input,
+                schema,
+            ))))
         } else {
             Ok(None)
         }
@@ -2716,4 +2725,27 @@ digraph {
 
         assert_eq!(expected_graph, generated_graph);
     }
+
+    #[tokio::test]
+    async fn test_display_graphviz_with_statistics() {
+        let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
+
+        let logical_plan = scan_empty(Some("employee"), &schema, None)
+            .unwrap()
+            .project(vec![col("id") + lit(2)])
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let plan = plan(&logical_plan).await.unwrap();
+
+        let expected_tooltip = ", tooltip=\"statistics=[";
+
+        let generated_graph = format!(
+            "{}",
+            displayable(&*plan).set_show_statistics(true).graphviz()
+        );
+
+        assert_contains!(generated_graph, expected_tooltip);
+    }
 }
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index 12dff15dc0..f32ffc1642 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -807,3 +807,21 @@ async fn explain_physical_plan_only() {
     ]];
     assert_eq!(expected, actual);
 }
+
+#[tokio::test]
+async fn csv_explain_analyze_with_statistics() {
+    let mut config = ConfigOptions::new();
+    config.explain.physical_plan_only = true;
+    config.explain.show_statistics = true;
+    let ctx = SessionContext::with_config(config.into());
+    register_aggregate_csv_by_sql(&ctx).await;
+
+    let sql = "EXPLAIN ANALYZE SELECT c1 FROM aggregate_test_100";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let formatted = arrow::util::pretty::pretty_format_batches(&actual)
+        .unwrap()
+        .to_string();
+
+    // should contain scan statistics
+    assert_contains!(&formatted, ", statistics=[]");
+}
diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt
index ad9b2be40e..d44d19737a 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -251,3 +251,39 @@ physical_plan after EnforceSorting SAME TEXT AS ABOVE
 physical_plan after coalesce_batches SAME TEXT AS ABOVE
 physical_plan after PipelineChecker SAME TEXT AS ABOVE
 physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true
+
+
+### tests for EXPLAIN with display statistics enabled
+statement ok
+set datafusion.explain.show_statistics = true;
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+# CSV scan with empty statistics
+query TT
+EXPLAIN SELECT a, b, c FROM simple_explain_test limit 10;
+----
+physical_plan
+GlobalLimitExec: skip=0, fetch=10, statistics=[rows=10, bytes=None, exact=false]
+--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], limit=10, has_header=true, statistics=[]
+
+# Parquet scan with statistics collected
+statement ok
+set datafusion.execution.collect_statistics = true;
+
+statement ok
+CREATE EXTERNAL TABLE alltypes_plain STORED AS PARQUET LOCATION '../../parquet-testing/data/alltypes_plain.parquet';
+
+query TT
+EXPLAIN SELECT * FROM alltypes_plain limit 10;
+----
+physical_plan
+GlobalLimitExec: skip=0, fetch=10, statistics=[rows=8, bytes=None, exact=true]
+--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[rows=8, bytes=None, exact=true]
+
+statement ok
+set datafusion.execution.collect_statistics = false;
+
+statement ok
+set datafusion.explain.show_statistics = false;
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt
index 5db305105f..362f162b05 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -179,6 +179,7 @@ datafusion.execution.target_partitions 7
 datafusion.execution.time_zone +00:00
 datafusion.explain.logical_plan_only false
 datafusion.explain.physical_plan_only false
+datafusion.explain.show_statistics false
 datafusion.optimizer.allow_symmetric_joins_without_pruning true
 datafusion.optimizer.bounded_order_preserving_variants false
 datafusion.optimizer.enable_round_robin_repartition true
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index 0d3abeac9f..21b0d8bbfd 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -92,6 +92,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
 | datafusion.optimizer.hash_join_single_partition_threshold  | 1048576                   | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition                                                                                                                                                                                                                                                                                                    [...]
 | datafusion.explain.logical_plan_only                       | false                     | When set to true, the explain statement will only print logical plans                                                                                                                                                                                                                                                                                                                                             [...]
 | datafusion.explain.physical_plan_only                      | false                     | When set to true, the explain statement will only print physical plans                                                                                                                                                                                                                                                                                                                                            [...]
+| datafusion.explain.show_statistics                         | false                     | When set to true, the explain statement will print operator statistics for physical plans                                                                                                                                                                                                                                                                                                                         [...]
 | datafusion.sql_parser.parse_float_as_decimal               | false                     | When set to true, SQL parser will parse float as decimal type                                                                                                                                                                                                                                                                                                                                                     [...]
 | datafusion.sql_parser.enable_ident_normalization           | true                      | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)                                                                                                                                                                                                                                                                                                                    [...]
 | datafusion.sql_parser.dialect                              | generic                   | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi.                                                                                                                                                                                                                            [...]