You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/09/08 10:23:45 UTC
[arrow-datafusion] branch main updated: feat: explain with statistics (#7459)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 73d6d5f50a feat: explain with statistics (#7459)
73d6d5f50a is described below
commit 73d6d5f50a73e43185401011bd6b53e8d1d345ec
Author: Eduard Karacharov <13...@users.noreply.github.com>
AuthorDate: Fri Sep 8 13:23:40 2023 +0300
feat: explain with statistics (#7459)
* explain with statistics
* sqllogictest for parquet with statistics
---
datafusion/common/src/config.rs | 4 ++
datafusion/common/src/stats.rs | 21 +++++++++
datafusion/core/src/physical_plan/analyze.rs | 18 +++++++-
datafusion/core/src/physical_plan/display.rs | 54 ++++++++++++++++++++--
datafusion/core/src/physical_planner.rs | 34 +++++++++++++-
datafusion/core/tests/sql/explain_analyze.rs | 18 ++++++++
datafusion/sqllogictest/test_files/explain.slt | 36 +++++++++++++++
.../sqllogictest/test_files/information_schema.slt | 1 +
docs/source/user-guide/configs.md | 1 +
9 files changed, 179 insertions(+), 8 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 5a32349c65..621cb9b64d 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -473,6 +473,10 @@ config_namespace! {
/// When set to true, the explain statement will only print physical plans
pub physical_plan_only: bool, default = false
+
+ /// When set to true, the explain statement will print operator statistics
+ /// for physical plans
+ pub show_statistics: bool, default = false
}
}
diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs
index d0f150a316..db788efef7 100644
--- a/datafusion/common/src/stats.rs
+++ b/datafusion/common/src/stats.rs
@@ -17,6 +17,8 @@
//! This module provides data structures to represent statistics
+use std::fmt::Display;
+
use crate::ScalarValue;
/// Statistics for a relation
@@ -37,6 +39,25 @@ pub struct Statistics {
pub is_exact: bool,
}
+impl Display for Statistics {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ if self.num_rows.is_none() && self.total_byte_size.is_none() && !self.is_exact {
+ return Ok(());
+ }
+
+ let rows = self
+ .num_rows
+ .map_or_else(|| "None".to_string(), |v| v.to_string());
+ let bytes = self
+ .total_byte_size
+ .map_or_else(|| "None".to_string(), |v| v.to_string());
+
+ write!(f, "rows={}, bytes={}, exact={}", rows, bytes, self.is_exact)?;
+
+ Ok(())
+ }
+}
+
/// Statistics for a column within a relation
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ColumnStatistics {
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index 0e6edc6182..98fce19a1d 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -39,6 +39,8 @@ use datafusion_execution::TaskContext;
pub struct AnalyzeExec {
/// control how much extra to print
verbose: bool,
+ /// if statistics should be displayed
+ show_statistics: bool,
/// The input plan (the plan being analyzed)
pub(crate) input: Arc<dyn ExecutionPlan>,
/// The output schema for RecordBatches of this exec node
@@ -47,9 +49,15 @@ pub struct AnalyzeExec {
impl AnalyzeExec {
/// Create a new AnalyzeExec
- pub fn new(verbose: bool, input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
+ pub fn new(
+ verbose: bool,
+ show_statistics: bool,
+ input: Arc<dyn ExecutionPlan>,
+ schema: SchemaRef,
+ ) -> Self {
AnalyzeExec {
verbose,
+ show_statistics,
input,
schema,
}
@@ -111,6 +119,7 @@ impl ExecutionPlan for AnalyzeExec {
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self::new(
self.verbose,
+ self.show_statistics,
children.pop().unwrap(),
self.schema.clone(),
)))
@@ -143,6 +152,7 @@ impl ExecutionPlan for AnalyzeExec {
let captured_input = self.input.clone();
let captured_schema = self.schema.clone();
let verbose = self.verbose;
+ let show_statistics = self.show_statistics;
// future that gathers the results from all the tasks in the
// JoinSet that computes the overall row count and final
@@ -157,6 +167,7 @@ impl ExecutionPlan for AnalyzeExec {
let duration = Instant::now() - start;
create_output_batch(
verbose,
+ show_statistics,
total_rows,
duration,
captured_input,
@@ -179,6 +190,7 @@ impl ExecutionPlan for AnalyzeExec {
/// Creates the ouput of AnalyzeExec as a RecordBatch
fn create_output_batch(
verbose: bool,
+ show_statistics: bool,
total_rows: usize,
duration: std::time::Duration,
input: Arc<dyn ExecutionPlan>,
@@ -191,6 +203,7 @@ fn create_output_batch(
type_builder.append_value("Plan with Metrics");
let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
+ .set_show_statistics(show_statistics)
.indent(verbose)
.to_string();
plan_builder.append_value(annotated_plan);
@@ -201,6 +214,7 @@ fn create_output_batch(
type_builder.append_value("Plan with Full Metrics");
let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
+ .set_show_statistics(show_statistics)
.indent(verbose)
.to_string();
plan_builder.append_value(annotated_plan);
@@ -245,7 +259,7 @@ mod tests {
let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
let refs = blocking_exec.refs();
- let analyze_exec = Arc::new(AnalyzeExec::new(true, blocking_exec, schema));
+ let analyze_exec = Arc::new(AnalyzeExec::new(true, false, blocking_exec, schema));
let fut = collect(analyze_exec, task_ctx);
let mut fut = fut.boxed();
diff --git a/datafusion/core/src/physical_plan/display.rs b/datafusion/core/src/physical_plan/display.rs
index 15109cba95..3b345bdf9e 100644
--- a/datafusion/core/src/physical_plan/display.rs
+++ b/datafusion/core/src/physical_plan/display.rs
@@ -42,38 +42,49 @@ pub struct DisplayableExecutionPlan<'a> {
inner: &'a dyn ExecutionPlan,
/// How to show metrics
show_metrics: ShowMetrics,
+ /// If statistics should be displayed
+ show_statistics: bool,
}
impl<'a> DisplayableExecutionPlan<'a> {
- /// Create a wrapper around an [`'ExecutionPlan'] which can be
+ /// Create a wrapper around an [`ExecutionPlan`] which can be
/// pretty printed in a variety of ways
pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
Self {
inner,
show_metrics: ShowMetrics::None,
+ show_statistics: false,
}
}
- /// Create a wrapper around an [`'ExecutionPlan'] which can be
+ /// Create a wrapper around an [`ExecutionPlan`] which can be
/// pretty printed in a variety of ways that also shows aggregated
/// metrics
pub fn with_metrics(inner: &'a dyn ExecutionPlan) -> Self {
Self {
inner,
show_metrics: ShowMetrics::Aggregated,
+ show_statistics: false,
}
}
- /// Create a wrapper around an [`'ExecutionPlan'] which can be
+ /// Create a wrapper around an [`ExecutionPlan`] which can be
/// pretty printed in a variety of ways that also shows all low
/// level metrics
pub fn with_full_metrics(inner: &'a dyn ExecutionPlan) -> Self {
Self {
inner,
show_metrics: ShowMetrics::Full,
+ show_statistics: false,
}
}
+ /// Enable display of statistics
+ pub fn set_show_statistics(mut self, show_statistics: bool) -> Self {
+ self.show_statistics = show_statistics;
+ self
+ }
+
/// Return a `format`able structure that produces a single line
/// per node.
///
@@ -94,6 +105,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
format_type: DisplayFormatType,
plan: &'a dyn ExecutionPlan,
show_metrics: ShowMetrics,
+ show_statistics: bool,
}
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -102,6 +114,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
f,
indent: 0,
show_metrics: self.show_metrics,
+ show_statistics: self.show_statistics,
};
accept(self.plan, &mut visitor)
}
@@ -110,6 +123,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
format_type,
plan: self.inner,
show_metrics: self.show_metrics,
+ show_statistics: self.show_statistics,
}
}
@@ -128,6 +142,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
struct Wrapper<'a> {
plan: &'a dyn ExecutionPlan,
show_metrics: ShowMetrics,
+ show_statistics: bool,
}
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -137,6 +152,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
f,
t,
show_metrics: self.show_metrics,
+ show_statistics: self.show_statistics,
graphviz_builder: GraphvizBuilder::default(),
parents: Vec::new(),
};
@@ -153,6 +169,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
Wrapper {
plan: self.inner,
show_metrics: self.show_metrics,
+ show_statistics: self.show_statistics,
}
}
@@ -162,6 +179,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
struct Wrapper<'a> {
plan: &'a dyn ExecutionPlan,
show_metrics: ShowMetrics,
+ show_statistics: bool,
}
impl<'a> fmt::Display for Wrapper<'a> {
@@ -171,6 +189,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
t: DisplayFormatType::Default,
indent: 0,
show_metrics: self.show_metrics,
+ show_statistics: self.show_statistics,
};
visitor.pre_visit(self.plan)?;
Ok(())
@@ -180,6 +199,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
Wrapper {
plan: self.inner,
show_metrics: self.show_metrics,
+ show_statistics: self.show_statistics,
}
}
@@ -215,6 +235,8 @@ struct IndentVisitor<'a, 'b> {
indent: usize,
/// How to show metrics
show_metrics: ShowMetrics,
+ /// If statistics should be displayed
+ show_statistics: bool,
}
impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
@@ -244,6 +266,9 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
}
}
}
+ if self.show_statistics {
+ write!(self.f, ", statistics=[{}]", plan.statistics())?;
+ }
writeln!(self.f)?;
self.indent += 1;
Ok(true)
@@ -261,6 +286,9 @@ struct GraphvizVisitor<'a, 'b> {
t: DisplayFormatType,
/// How to show metrics
show_metrics: ShowMetrics,
+ /// If statistics should be displayed
+ show_statistics: bool,
+
graphviz_builder: GraphvizBuilder,
/// Used to record parent node ids when visiting a plan.
parents: Vec<usize>,
@@ -318,8 +346,24 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> {
}
};
- self.graphviz_builder
- .add_node(self.f, id, &label, Some(&metrics))?;
+ let statistics = if self.show_statistics {
+ format!("statistics=[{}]", plan.statistics())
+ } else {
+ "".to_string()
+ };
+
+ let delimiter = if !metrics.is_empty() && !statistics.is_empty() {
+ ", "
+ } else {
+ ""
+ };
+
+ self.graphviz_builder.add_node(
+ self.f,
+ id,
+ &label,
+ Some(&format!("{}{}{}", metrics, delimiter, statistics)),
+ )?;
if let Some(parent_node_id) = self.parents.last() {
self.graphviz_builder
diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index 3a59f40ede..35f5f96481 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1892,6 +1892,7 @@ impl DefaultPhysicalPlanner {
Ok(input) => {
stringified_plans.push(
displayable(input.as_ref())
+ .set_show_statistics(config.show_statistics)
.to_stringified(e.verbose, InitialPhysicalPlan),
);
@@ -1903,12 +1904,14 @@ impl DefaultPhysicalPlanner {
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans.push(
displayable(plan)
+ .set_show_statistics(config.show_statistics)
.to_stringified(e.verbose, plan_type),
);
},
) {
Ok(input) => stringified_plans.push(
displayable(input.as_ref())
+ .set_show_statistics(config.show_statistics)
.to_stringified(e.verbose, FinalPhysicalPlan),
),
Err(DataFusionError::Context(optimizer_name, e)) => {
@@ -1932,7 +1935,13 @@ impl DefaultPhysicalPlanner {
} else if let LogicalPlan::Analyze(a) = logical_plan {
let input = self.create_physical_plan(&a.input, session_state).await?;
let schema = SchemaRef::new((*a.schema).clone().into());
- Ok(Some(Arc::new(AnalyzeExec::new(a.verbose, input, schema))))
+ let show_statistics = session_state.config_options().explain.show_statistics;
+ Ok(Some(Arc::new(AnalyzeExec::new(
+ a.verbose,
+ show_statistics,
+ input,
+ schema,
+ ))))
} else {
Ok(None)
}
@@ -2716,4 +2725,27 @@ digraph {
assert_eq!(expected_graph, generated_graph);
}
+
+ #[tokio::test]
+ async fn test_display_graphviz_with_statistics() {
+ let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
+
+ let logical_plan = scan_empty(Some("employee"), &schema, None)
+ .unwrap()
+ .project(vec![col("id") + lit(2)])
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let plan = plan(&logical_plan).await.unwrap();
+
+ let expected_tooltip = ", tooltip=\"statistics=[";
+
+ let generated_graph = format!(
+ "{}",
+ displayable(&*plan).set_show_statistics(true).graphviz()
+ );
+
+ assert_contains!(generated_graph, expected_tooltip);
+ }
}
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index 12dff15dc0..f32ffc1642 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -807,3 +807,21 @@ async fn explain_physical_plan_only() {
]];
assert_eq!(expected, actual);
}
+
+#[tokio::test]
+async fn csv_explain_analyze_with_statistics() {
+ let mut config = ConfigOptions::new();
+ config.explain.physical_plan_only = true;
+ config.explain.show_statistics = true;
+ let ctx = SessionContext::with_config(config.into());
+ register_aggregate_csv_by_sql(&ctx).await;
+
+ let sql = "EXPLAIN ANALYZE SELECT c1 FROM aggregate_test_100";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let formatted = arrow::util::pretty::pretty_format_batches(&actual)
+ .unwrap()
+ .to_string();
+
+ // should contain scan statistics
+ assert_contains!(&formatted, ", statistics=[]");
+}
diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt
index ad9b2be40e..d44d19737a 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -251,3 +251,39 @@ physical_plan after EnforceSorting SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after PipelineChecker SAME TEXT AS ABOVE
physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true
+
+
+### tests for EXPLAIN with display statistics enabled
+statement ok
+set datafusion.explain.show_statistics = true;
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+# CSV scan with empty statistics
+query TT
+EXPLAIN SELECT a, b, c FROM simple_explain_test limit 10;
+----
+physical_plan
+GlobalLimitExec: skip=0, fetch=10, statistics=[rows=10, bytes=None, exact=false]
+--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], limit=10, has_header=true, statistics=[]
+
+# Parquet scan with statistics collected
+statement ok
+set datafusion.execution.collect_statistics = true;
+
+statement ok
+CREATE EXTERNAL TABLE alltypes_plain STORED AS PARQUET LOCATION '../../parquet-testing/data/alltypes_plain.parquet';
+
+query TT
+EXPLAIN SELECT * FROM alltypes_plain limit 10;
+----
+physical_plan
+GlobalLimitExec: skip=0, fetch=10, statistics=[rows=8, bytes=None, exact=true]
+--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[rows=8, bytes=None, exact=true]
+
+statement ok
+set datafusion.execution.collect_statistics = false;
+
+statement ok
+set datafusion.explain.show_statistics = false;
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt
index 5db305105f..362f162b05 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -179,6 +179,7 @@ datafusion.execution.target_partitions 7
datafusion.execution.time_zone +00:00
datafusion.explain.logical_plan_only false
datafusion.explain.physical_plan_only false
+datafusion.explain.show_statistics false
datafusion.optimizer.allow_symmetric_joins_without_pruning true
datafusion.optimizer.bounded_order_preserving_variants false
datafusion.optimizer.enable_round_robin_repartition true
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index 0d3abeac9f..21b0d8bbfd 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -92,6 +92,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition [...]
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans [...]
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans [...]
+| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans [...]
| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type [...]
| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) [...]
| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. [...]