You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/20 19:06:28 UTC
[arrow-datafusion] branch main updated: Display all partitions and files in EXPLAIN VERBOSE (#6711)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f0e02abb67 Display all partitions and files in EXPLAIN VERBOSE (#6711)
f0e02abb67 is described below
commit f0e02abb67c03f89a5aae81adcfcb62cb5320957
Author: Kirill Zaborsky <qr...@gmail.com>
AuthorDate: Tue Jun 20 22:06:23 2023 +0300
Display all partitions and files in EXPLAIN VERBOSE (#6711)
Adds DisplayAs trait for structs which could show more details when
formatted in the verbose mode
Resolves https://github.com/apache/arrow-datafusion/issues/6383
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
benchmarks/src/bin/tpch.rs | 4 +-
datafusion/core/src/datasource/file_format/csv.rs | 25 +--
datafusion/core/src/datasource/memory.rs | 16 +-
.../src/datasource/physical_plan/arrow_file.rs | 9 +-
.../core/src/datasource/physical_plan/avro.rs | 9 +-
.../core/src/datasource/physical_plan/csv.rs | 14 +-
.../core/src/datasource/physical_plan/json.rs | 9 +-
.../core/src/datasource/physical_plan/mod.rs | 185 ++++++++++++++++-----
.../core/src/datasource/physical_plan/parquet.rs | 17 +-
.../combine_partial_final_agg.rs | 2 +-
.../src/physical_optimizer/dist_enforcement.rs | 4 +-
.../core/src/physical_optimizer/join_selection.rs | 2 +-
.../core/src/physical_optimizer/repartition.rs | 2 +-
.../src/physical_optimizer/sort_enforcement.rs | 4 +-
.../core/src/physical_plan/aggregates/mod.rs | 2 +-
datafusion/core/src/physical_plan/analyze.rs | 6 +-
.../core/src/physical_plan/coalesce_batches.rs | 2 +-
.../core/src/physical_plan/coalesce_partitions.rs | 2 +-
datafusion/core/src/physical_plan/display.rs | 55 ++++--
datafusion/core/src/physical_plan/empty.rs | 2 +-
datafusion/core/src/physical_plan/explain.rs | 2 +-
datafusion/core/src/physical_plan/filter.rs | 2 +-
datafusion/core/src/physical_plan/insert.rs | 12 +-
.../core/src/physical_plan/joins/cross_join.rs | 2 +-
.../core/src/physical_plan/joins/hash_join.rs | 2 +-
.../src/physical_plan/joins/nested_loop_join.rs | 2 +-
.../src/physical_plan/joins/sort_merge_join.rs | 2 +-
.../src/physical_plan/joins/symmetric_hash_join.rs | 6 +-
datafusion/core/src/physical_plan/limit.rs | 4 +-
datafusion/core/src/physical_plan/memory.rs | 2 +-
datafusion/core/src/physical_plan/mod.rs | 6 +-
datafusion/core/src/physical_plan/projection.rs | 2 +-
.../core/src/physical_plan/repartition/mod.rs | 2 +-
datafusion/core/src/physical_plan/sorts/sort.rs | 2 +-
.../physical_plan/sorts/sort_preserving_merge.rs | 2 +-
datafusion/core/src/physical_plan/union.rs | 4 +-
datafusion/core/src/physical_plan/unnest.rs | 2 +-
datafusion/core/src/physical_plan/values.rs | 2 +-
.../windows/bounded_window_agg_exec.rs | 2 +-
.../src/physical_plan/windows/window_agg_exec.rs | 2 +-
datafusion/core/src/physical_planner.rs | 22 +--
datafusion/core/src/test/exec.rs | 12 +-
datafusion/core/src/test_util/mod.rs | 2 +-
datafusion/core/tests/custom_sources.rs | 2 +-
.../provider_filter_pushdown.rs | 2 +-
.../core/tests/custom_sources_cases/statistics.rs | 2 +-
datafusion/core/tests/sql/explain_analyze.rs | 4 +-
datafusion/core/tests/sql/order.rs | 2 +-
datafusion/core/tests/user_defined_plan.rs | 2 +-
.../tests/cases/roundtrip_physical_plan.rs | 7 +-
50 files changed, 319 insertions(+), 171 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 4ba8b26bba..32359dc1f8 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -303,14 +303,14 @@ async fn execute_query(
if debug {
println!(
"=== Physical plan ===\n{}\n",
- displayable(physical_plan.as_ref()).indent()
+ displayable(physical_plan.as_ref()).indent(true)
);
}
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
- DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent()
+ DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(true)
);
if !result.is_empty() {
// do not call print_batches if there are no batches as the result is confusing
diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs
index 01bf76ccf4..dd6d3cd7f7 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -20,7 +20,7 @@
use std::any::Any;
use std::collections::HashSet;
use std::fmt;
-use std::fmt::{Debug, Display};
+use std::fmt::Debug;
use std::sync::Arc;
use arrow::csv::WriterBuilder;
@@ -51,7 +51,7 @@ use crate::datasource::physical_plan::{
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, InsertExec};
-use crate::physical_plan::Statistics;
+use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
/// The default file extension of csv files
@@ -443,14 +443,19 @@ impl Debug for CsvSink {
}
}
-impl Display for CsvSink {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(
- f,
- "CsvSink(writer_mode={:?}, file_groups={})",
- self.config.writer_mode,
- FileGroupDisplay(&self.config.file_groups),
- )
+impl DisplayAs for CsvSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "CsvSink(writer_mode={:?}, file_groups=",
+ self.config.writer_mode
+ )?;
+ FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ write!(f, ")")
+ }
+ }
}
}
diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs
index 3f6316d28a..b7cb013eba 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -20,7 +20,7 @@
use futures::StreamExt;
use log::debug;
use std::any::Any;
-use std::fmt::{self, Debug, Display};
+use std::fmt::{self, Debug};
use std::sync::Arc;
use arrow::datatypes::SchemaRef;
@@ -36,9 +36,9 @@ use crate::logical_expr::Expr;
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::insert::{DataSink, InsertExec};
use crate::physical_plan::memory::MemoryExec;
-use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{common, SendableRecordBatchStream};
use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
+use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
/// Type alias for partition data
pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
@@ -213,10 +213,14 @@ impl Debug for MemSink {
}
}
-impl Display for MemSink {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- let partition_count = self.batches.len();
- write!(f, "MemoryTable (partitions={partition_count})")
+impl DisplayAs for MemSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let partition_count = self.batches.len();
+ write!(f, "MemoryTable (partitions={partition_count})")
+ }
+ }
}
}
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 43074ccb77..0003ffea8c 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -22,7 +22,7 @@ use crate::datasource::physical_plan::{
use crate::error::Result;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
- ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+ ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream,
};
use arrow_schema::SchemaRef;
@@ -137,11 +137,8 @@ impl ExecutionPlan for ArrowExec {
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default => {
- write!(f, "ArrowExec: {}", self.base_config)
- }
- }
+ write!(f, "ArrowExec: ")?;
+ self.base_config.fmt_as(t, f)
}
fn statistics(&self) -> Statistics {
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs
index 0c286ba19c..df82ccca29 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -20,7 +20,7 @@ use crate::error::Result;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
- ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+ ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion_execution::TaskContext;
@@ -146,11 +146,8 @@ impl ExecutionPlan for AvroExec {
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default => {
- write!(f, "AvroExec: {}", self.base_config)
- }
- }
+ write!(f, "AvroExec: ")?;
+ self.base_config.fmt_as(t, f)
}
fn statistics(&self) -> Statistics {
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs
index d2c76ecaf5..027bd1945b 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -27,7 +27,7 @@ use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
- ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+ ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
};
use arrow::csv;
@@ -177,15 +177,9 @@ impl ExecutionPlan for CsvExec {
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default => {
- write!(
- f,
- "CsvExec: {}, has_header={}",
- self.base_config, self.has_header,
- )
- }
- }
+ write!(f, "CsvExec: ")?;
+ self.base_config.fmt_as(t, f)?;
+ write!(f, ", has_header={}", self.has_header)
}
fn statistics(&self) -> Statistics {
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs
index 8340c282a0..b736fd7839 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -26,7 +26,7 @@ use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
- ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
+ ordering_equivalence_properties_helper, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion_execution::TaskContext;
@@ -155,11 +155,8 @@ impl ExecutionPlan for NdJsonExec {
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default => {
- write!(f, "JsonExec: {}", self.base_config)
- }
- }
+ write!(f, "JsonExec: ")?;
+ self.base_config.fmt_as(t, f)
}
fn statistics(&self) -> Statistics {
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs
index ad22880a43..610b5ee424 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -44,12 +44,15 @@ pub use file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
pub(crate) use json::plan_to_json;
pub use json::{JsonOpener, NdJsonExec};
-use crate::datasource::file_format::FileWriterMode;
use crate::datasource::{
listing::{FileRange, PartitionedFile},
object_store::ObjectStoreUrl,
};
use crate::physical_plan::ExecutionPlan;
+use crate::{
+ datasource::file_format::FileWriterMode,
+ physical_plan::{DisplayAs, DisplayFormatType},
+};
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
@@ -269,15 +272,16 @@ impl Debug for FileScanConfig {
write!(f, "statistics={:?}, ", self.statistics)?;
- Display::fmt(self, f)
+ DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)
}
}
-impl Display for FileScanConfig {
- fn fmt(&self, f: &mut Formatter) -> FmtResult {
+impl DisplayAs for FileScanConfig {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
let (schema, _, orderings) = self.project();
- write!(f, "file_groups={}", FileGroupsDisplay(&self.file_groups))?;
+ write!(f, "file_groups=")?;
+ FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
if !schema.fields().is_empty() {
write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
@@ -310,16 +314,25 @@ impl Display for FileScanConfig {
#[derive(Debug)]
struct FileGroupsDisplay<'a>(&'a [Vec<PartitionedFile>]);
-impl<'a> Display for FileGroupsDisplay<'a> {
- fn fmt(&self, f: &mut Formatter) -> FmtResult {
+impl<'a> DisplayAs for FileGroupsDisplay<'a> {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
let n_groups = self.0.len();
let groups = if n_groups == 1 { "group" } else { "groups" };
write!(f, "{{{n_groups} {groups}: [")?;
- // To avoid showing too many partitions
- let max_groups = 5;
- fmt_up_to_n_elements(self.0, max_groups, f, |group, f| {
- write!(f, "{}", FileGroupDisplay(group))
- })?;
+ match t {
+ DisplayFormatType::Default => {
+ // To avoid showing too many partitions
+ let max_groups = 5;
+ fmt_up_to_n_elements(self.0, max_groups, f, |group, f| {
+ FileGroupDisplay(group).fmt_as(t, f)
+ })?;
+ }
+ DisplayFormatType::Verbose => {
+ fmt_elements_split_by_commas(self.0.iter(), f, |group, f| {
+ FileGroupDisplay(group).fmt_as(t, f)
+ })?
+ }
+ }
write!(f, "]}}")
}
}
@@ -333,18 +346,31 @@ impl<'a> Display for FileGroupsDisplay<'a> {
#[derive(Debug)]
pub(crate) struct FileGroupDisplay<'a>(pub &'a [PartitionedFile]);
-impl<'a> Display for FileGroupDisplay<'a> {
- fn fmt(&self, f: &mut Formatter) -> FmtResult {
+impl<'a> DisplayAs for FileGroupDisplay<'a> {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
write!(f, "[")?;
- // To avoid showing too many files
- let max_files = 5;
- fmt_up_to_n_elements(self.0, max_files, f, |pf, f| {
- write!(f, "{}", pf.object_meta.location.as_ref())?;
- if let Some(range) = pf.range.as_ref() {
- write!(f, ":{}..{}", range.start, range.end)?;
+ match t {
+ DisplayFormatType::Default => {
+ // To avoid showing too many files
+ let max_files = 5;
+ fmt_up_to_n_elements(self.0, max_files, f, |pf, f| {
+ write!(f, "{}", pf.object_meta.location.as_ref())?;
+ if let Some(range) = pf.range.as_ref() {
+ write!(f, ":{}..{}", range.start, range.end)?;
+ }
+ Ok(())
+ })?
+ }
+ DisplayFormatType::Verbose => {
+ fmt_elements_split_by_commas(self.0.iter(), f, |pf, f| {
+ write!(f, "{}", pf.object_meta.location.as_ref())?;
+ if let Some(range) = pf.range.as_ref() {
+ write!(f, ":{}..{}", range.start, range.end)?;
+ }
+ Ok(())
+ })?
}
- Ok(())
- })?;
+ }
write!(f, "]")
}
}
@@ -360,16 +386,32 @@ where
F: Fn(&E, &mut Formatter) -> FmtResult,
{
let len = elements.len();
- for (idx, element) in elements.iter().take(n).enumerate() {
+ fmt_elements_split_by_commas(elements.iter().take(n), f, |element, f| {
+ format_element(element, f)
+ })?;
+ // Remaining elements are showed as `...` (to indicate there is more)
+ if len > n {
+ write!(f, ", ...")?;
+ }
+ Ok(())
+}
+
+/// helper formatting array elements with a comma and a space between them
+fn fmt_elements_split_by_commas<E, I, F>(
+ iter: I,
+ f: &mut Formatter,
+ format_element: F,
+) -> FmtResult
+where
+ I: Iterator<Item = E>,
+ F: Fn(E, &mut Formatter) -> FmtResult,
+{
+ for (idx, element) in iter.enumerate() {
if idx > 0 {
write!(f, ", ")?;
}
format_element(element, f)?;
}
- // Remaining elements are showed as `...` (to indicate there is more)
- if len > n {
- write!(f, ", ...")?;
- }
Ok(())
}
@@ -888,6 +930,7 @@ mod tests {
};
use chrono::Utc;
+ use crate::physical_plan::{DefaultDisplay, VerboseDisplay};
use crate::{
test::{build_table_i32, columns},
test_util::aggr_test_schema,
@@ -1289,7 +1332,7 @@ mod tests {
#[test]
fn file_groups_display_empty() {
let expected = "{0 groups: []}";
- assert_eq!(&FileGroupsDisplay(&[]).to_string(), expected);
+ assert_eq!(DefaultDisplay(FileGroupsDisplay(&[])).to_string(), expected);
}
#[test]
@@ -1297,11 +1340,29 @@ mod tests {
let files = [vec![partitioned_file("foo"), partitioned_file("bar")]];
let expected = "{1 group: [[foo, bar]]}";
- assert_eq!(&FileGroupsDisplay(&files).to_string(), expected);
+ assert_eq!(
+ DefaultDisplay(FileGroupsDisplay(&files)).to_string(),
+ expected
+ );
+ }
+
+ #[test]
+ fn file_groups_display_many_default() {
+ let files = [
+ vec![partitioned_file("foo"), partitioned_file("bar")],
+ vec![partitioned_file("baz")],
+ vec![],
+ ];
+
+ let expected = "{3 groups: [[foo, bar], [baz], []]}";
+ assert_eq!(
+ DefaultDisplay(FileGroupsDisplay(&files)).to_string(),
+ expected
+ );
}
#[test]
- fn file_groups_display_many() {
+ fn file_groups_display_many_verbose() {
let files = [
vec![partitioned_file("foo"), partitioned_file("bar")],
vec![partitioned_file("baz")],
@@ -1309,11 +1370,14 @@ mod tests {
];
let expected = "{3 groups: [[foo, bar], [baz], []]}";
- assert_eq!(&FileGroupsDisplay(&files).to_string(), expected);
+ assert_eq!(
+ VerboseDisplay(FileGroupsDisplay(&files)).to_string(),
+ expected
+ );
}
#[test]
- fn file_groups_display_too_many() {
+ fn file_groups_display_too_many_default() {
let files = [
vec![partitioned_file("foo"), partitioned_file("bar")],
vec![partitioned_file("baz")],
@@ -1325,19 +1389,45 @@ mod tests {
];
let expected = "{7 groups: [[foo, bar], [baz], [qux], [quux], [quuux], ...]}";
- assert_eq!(&FileGroupsDisplay(&files).to_string(), expected);
+ assert_eq!(
+ DefaultDisplay(FileGroupsDisplay(&files)).to_string(),
+ expected
+ );
+ }
+
+ #[test]
+ fn file_groups_display_too_many_verbose() {
+ let files = [
+ vec![partitioned_file("foo"), partitioned_file("bar")],
+ vec![partitioned_file("baz")],
+ vec![partitioned_file("qux")],
+ vec![partitioned_file("quux")],
+ vec![partitioned_file("quuux")],
+ vec![partitioned_file("quuuux")],
+ vec![],
+ ];
+
+ let expected =
+ "{7 groups: [[foo, bar], [baz], [qux], [quux], [quuux], [quuuux], []]}";
+ assert_eq!(
+ VerboseDisplay(FileGroupsDisplay(&files)).to_string(),
+ expected
+ );
}
#[test]
- fn file_group_display_many() {
+ fn file_group_display_many_default() {
let files = vec![partitioned_file("foo"), partitioned_file("bar")];
let expected = "[foo, bar]";
- assert_eq!(&FileGroupDisplay(&files).to_string(), expected);
+ assert_eq!(
+ DefaultDisplay(FileGroupDisplay(&files)).to_string(),
+ expected
+ );
}
#[test]
- fn file_group_display_too_many() {
+ fn file_group_display_too_many_default() {
let files = vec![
partitioned_file("foo"),
partitioned_file("bar"),
@@ -1348,7 +1438,28 @@ mod tests {
];
let expected = "[foo, bar, baz, qux, quux, ...]";
- assert_eq!(&FileGroupDisplay(&files).to_string(), expected);
+ assert_eq!(
+ DefaultDisplay(FileGroupDisplay(&files)).to_string(),
+ expected
+ );
+ }
+
+ #[test]
+ fn file_group_display_too_many_verbose() {
+ let files = vec![
+ partitioned_file("foo"),
+ partitioned_file("bar"),
+ partitioned_file("baz"),
+ partitioned_file("qux"),
+ partitioned_file("quux"),
+ partitioned_file("quuux"),
+ ];
+
+ let expected = "[foo, bar, baz, qux, quux, quuux]";
+ assert_eq!(
+ VerboseDisplay(FileGroupDisplay(&files)).to_string(),
+ expected
+ );
}
/// create a PartitionedFile for testing
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 48e4d49371..f538255bc2 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -21,7 +21,8 @@ use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::{
- parquet::page_filter::PagePruningPredicate, FileMeta, FileScanConfig, SchemaAdapter,
+ parquet::page_filter::PagePruningPredicate, DisplayAs, FileMeta, FileScanConfig,
+ SchemaAdapter,
};
use crate::{
config::ConfigOptions,
@@ -418,7 +419,7 @@ impl ExecutionPlan for ParquetExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
let predicate_string = self
.predicate
.as_ref()
@@ -431,11 +432,9 @@ impl ExecutionPlan for ParquetExec {
.map(|pre| format!(", pruning_predicate={}", pre.predicate_expr()))
.unwrap_or_default();
- write!(
- f,
- "ParquetExec: {}{}{}",
- self.base_config, predicate_string, pruning_predicate_string,
- )
+ write!(f, "ParquetExec: ")?;
+ self.base_config.fmt_as(t, f)?;
+ write!(f, "{}{}", predicate_string, pruning_predicate_string,)
}
}
}
@@ -1869,7 +1868,9 @@ mod tests {
assert!(pruning_predicate.is_some());
// convert to explain plan form
- let display = displayable(rt.parquet_exec.as_ref()).indent().to_string();
+ let display = displayable(rt.parquet_exec.as_ref())
+ .indent(true)
+ .to_string();
assert_contains!(
&display,
diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 3ec9e9bbd0..05b0dc4f19 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -225,7 +225,7 @@ mod tests {
let config = ConfigOptions::new();
let optimized = optimizer.optimize($PLAN, &config)?;
// Now format correctly
- let plan = displayable(optimized.as_ref()).indent().to_string();
+ let plan = displayable(optimized.as_ref()).indent(true).to_string();
let actual_lines = trim_plan_display(&plan);
assert_eq!(
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index 4e456450bc..89107ac989 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -1174,7 +1174,7 @@ mod tests {
let optimized = optimizer.optimize(optimized, &config)?;
// Now format correctly
- let plan = displayable(optimized.as_ref()).indent().to_string();
+ let plan = displayable(optimized.as_ref()).indent(true).to_string();
let actual_lines = trim_plan_display(&plan);
assert_eq!(
@@ -1189,7 +1189,7 @@ mod tests {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
// Now format correctly
- let plan = displayable($PLAN.as_ref()).indent().to_string();
+ let plan = displayable($PLAN.as_ref()).indent(true).to_string();
let actual_lines = trim_plan_display(&plan);
assert_eq!(
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs
index a97ef6a3f9..accf2d5643 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -632,7 +632,7 @@ mod tests {
.optimize(Arc::new($PLAN), &ConfigOptions::new())
.unwrap();
- let plan = displayable(optimized.as_ref()).indent().to_string();
+ let plan = displayable(optimized.as_ref()).indent(true).to_string();
let actual_lines = plan.split("\n").collect::<Vec<&str>>();
assert_eq!(
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index fb867ff36c..33ebcec6a6 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -557,7 +557,7 @@ mod tests {
});
// Now format correctly
- let plan = displayable(optimized.as_ref()).indent().to_string();
+ let plan = displayable(optimized.as_ref()).indent(true).to_string();
let actual_lines = trim_plan_display(&plan);
assert_eq!(
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 727380adfc..03df84fa57 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -1025,7 +1025,7 @@ mod tests {
// Util function to get string representation of a physical plan
fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
- let formatted = displayable(plan.as_ref()).indent().to_string();
+ let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
actual.iter().map(|elem| elem.to_string()).collect()
}
@@ -1401,7 +1401,7 @@ mod tests {
let state = session_ctx.state();
let physical_plan = $PLAN;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index a2ae41de1f..8e68450078 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -827,7 +827,7 @@ impl ExecutionPlan for AggregateExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "AggregateExec: mode={:?}", self.mode)?;
let g: Vec<String> = if self.group_by.groups.len() == 1 {
self.group_by
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index 2e4441e307..90b7f4c02f 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -164,7 +164,7 @@ impl ExecutionPlan for AnalyzeExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "AnalyzeExec verbose={}", self.verbose)
}
}
@@ -191,7 +191,7 @@ fn create_output_batch(
type_builder.append_value("Plan with Metrics");
let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
- .indent()
+ .indent(verbose)
.to_string();
plan_builder.append_value(annotated_plan);
@@ -201,7 +201,7 @@ fn create_output_batch(
type_builder.append_value("Plan with Full Metrics");
let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
- .indent()
+ .indent(verbose)
.to_string();
plan_builder.append_value(annotated_plan);
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index d6b34e6bf1..0064f353e9 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -147,7 +147,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"CoalesceBatchesExec: target_batch_size={}",
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index d05c413caf..d04ffa576a 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -152,7 +152,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "CoalescePartitionsExec")
}
}
diff --git a/datafusion/core/src/physical_plan/display.rs b/datafusion/core/src/physical_plan/display.rs
index 2fba06ed29..674cbcf247 100644
--- a/datafusion/core/src/physical_plan/display.rs
+++ b/datafusion/core/src/physical_plan/display.rs
@@ -21,7 +21,7 @@
use std::fmt;
-use datafusion_common::display::{StringifiedPlan, ToStringifiedPlan};
+use datafusion_common::display::StringifiedPlan;
use super::{accept, ExecutionPlan, ExecutionPlanVisitor};
@@ -30,6 +30,8 @@ use super::{accept, ExecutionPlan, ExecutionPlanVisitor};
pub enum DisplayFormatType {
/// Default, compact format. Example: `FilterExec: c12 < 10.0`
Default,
+ /// Verbose, showing all available details
+ Verbose,
}
/// Wraps an `ExecutionPlan` with various ways to display this plan
@@ -79,16 +81,21 @@ impl<'a> DisplayableExecutionPlan<'a> {
/// RepartitionExec: partitioning=RoundRobinBatch(16)
/// CsvExec: source=...",
/// ```
- pub fn indent(&self) -> impl fmt::Display + 'a {
+ pub fn indent(&self, verbose: bool) -> impl fmt::Display + 'a {
+ let format_type = if verbose {
+ DisplayFormatType::Verbose
+ } else {
+ DisplayFormatType::Default
+ };
struct Wrapper<'a> {
+ format_type: DisplayFormatType,
plan: &'a dyn ExecutionPlan,
show_metrics: ShowMetrics,
}
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- let t = DisplayFormatType::Default;
let mut visitor = IndentVisitor {
- t,
+ t: self.format_type,
f,
indent: 0,
show_metrics: self.show_metrics,
@@ -97,6 +104,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
}
}
Wrapper {
+ format_type,
plan: self.inner,
show_metrics: self.show_metrics,
}
@@ -128,6 +136,15 @@ impl<'a> DisplayableExecutionPlan<'a> {
show_metrics: self.show_metrics,
}
}
+
+ /// format as a `StringifiedPlan`
+ pub fn to_stringified(
+ &self,
+ verbose: bool,
+ plan_type: crate::logical_expr::PlanType,
+ ) -> StringifiedPlan {
+ StringifiedPlan::new(plan_type, self.indent(verbose).to_string())
+ }
}
#[derive(Debug, Clone, Copy)]
@@ -192,11 +209,29 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
}
}
-impl<'a> ToStringifiedPlan for DisplayableExecutionPlan<'a> {
- fn to_stringified(
- &self,
- plan_type: crate::logical_expr::PlanType,
- ) -> StringifiedPlan {
- StringifiedPlan::new(plan_type, self.indent().to_string())
+/// Trait for types which could have additional details when formatted in `Verbose` mode
+pub trait DisplayAs {
+ /// Format according to `DisplayFormatType`, used when verbose representation looks
+ /// different from the default one
+ ///
+ /// Should not include a newline
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result;
+}
+
+/// A newtype wrapper to display `T` implementing`DisplayAs` using the `Default` mode
+pub struct DefaultDisplay<T>(pub T);
+
+impl<T: DisplayAs> fmt::Display for DefaultDisplay<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.0.fmt_as(DisplayFormatType::Default, f)
+ }
+}
+
+/// A newtype wrapper to display `T` implementing `DisplayAs` using the `Verbose` mode
+pub struct VerboseDisplay<T>(pub T);
+
+impl<T: DisplayAs> fmt::Display for VerboseDisplay<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.0.fmt_as(DisplayFormatType::Verbose, f)
}
}
diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index 627444ffd9..4d1b1cffc0 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -154,7 +154,7 @@ impl ExecutionPlan for EmptyExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row)
}
}
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index e40512f1b1..22b0817c33 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -162,7 +162,7 @@ impl ExecutionPlan for ExplainExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ExplainExec")
}
}
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index a6f00846de..d56f582028 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -163,7 +163,7 @@ impl ExecutionPlan for FilterExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "FilterExec: {}", self.predicate)
}
}
diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs
index f3bd701a56..4742e1617e 100644
--- a/datafusion/core/src/physical_plan/insert.rs
+++ b/datafusion/core/src/physical_plan/insert.rs
@@ -19,7 +19,8 @@
use super::expressions::PhysicalSortExpr;
use super::{
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+ Statistics,
};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
@@ -31,7 +32,7 @@ use datafusion_common::Result;
use datafusion_physical_expr::PhysicalSortRequirement;
use futures::StreamExt;
use std::any::Any;
-use std::fmt::{Debug, Display};
+use std::fmt::Debug;
use std::sync::Arc;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
@@ -45,7 +46,7 @@ use datafusion_execution::TaskContext;
/// The `Display` impl is used to format the sink for explain plan
/// output.
#[async_trait]
-pub trait DataSink: Display + Debug + Send + Sync {
+pub trait DataSink: DisplayAs + Debug + Send + Sync {
// TODO add desired input ordering
// How does this sink want its input ordered?
@@ -185,8 +186,9 @@ impl ExecutionPlan for InsertExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
- write!(f, "InsertExec: sink={}", self.sink)
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "InsertExec: sink=")?;
+ self.sink.fmt_as(t, f)
}
}
}
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs
index eb567ee130..f752d134a1 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -249,7 +249,7 @@ impl ExecutionPlan for CrossJoinExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "CrossJoinExec")
}
}
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 9d016a60f4..5a0e8f33f5 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -418,7 +418,7 @@ impl ExecutionPlan for HashJoinExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
let display_filter = self.filter.as_ref().map_or_else(
|| "".to_string(),
|f| format!(", filter={}", f.expression()),
diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
index 82e677f720..8de5c76e51 100644
--- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
+++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
@@ -251,7 +251,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
let display_filter = self.filter.as_ref().map_or_else(
|| "".to_string(),
|f| format!(", filter={:?}", f.expression()),
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index aa6a77925e..d87023093f 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -363,7 +363,7 @@ impl ExecutionPlan for SortMergeJoinExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"SortMergeJoin: join_type={:?}, on={:?}",
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index f2b750c0b6..68a8596d66 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -462,7 +462,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
let display_filter = self.filter.as_ref().map_or_else(
|| "".to_string(),
|f| format!(", filter={}", f.expression()),
@@ -2624,7 +2624,7 @@ mod tests {
let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
let dataframe = ctx.sql(sql).await?;
let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
let expected = {
[
"SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
@@ -2677,7 +2677,7 @@ mod tests {
let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
let dataframe = ctx.sql(sql).await?;
let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
let expected = {
[
"SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index 132bae6141..d1948cc288 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -170,7 +170,7 @@ impl ExecutionPlan for GlobalLimitExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"GlobalLimitExec: skip={}, fetch={}",
@@ -337,7 +337,7 @@ impl ExecutionPlan for LocalLimitExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "LocalLimitExec: fetch={}", self.fetch)
}
}
diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs
index 38fa5d549c..fa456d15ee 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -108,7 +108,7 @@ impl ExecutionPlan for MemoryExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
let partitions: Vec<_> =
self.partitions.iter().map(|b| b.len()).collect();
write!(
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 11385d994b..e3e9fec247 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -34,7 +34,7 @@ pub use datafusion_expr::Accumulator;
pub use datafusion_expr::ColumnarValue;
pub use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
use datafusion_physical_expr::equivalence::OrderingEquivalenceProperties;
-pub use display::DisplayFormatType;
+pub use display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
use futures::stream::{Stream, TryStreamExt};
use std::fmt;
use std::fmt::Debug;
@@ -312,9 +312,9 @@ pub fn with_new_children_if_necessary(
/// let dataframe = ctx.sql("SELECT a FROM example WHERE a < 5").await.unwrap();
/// let physical_plan = dataframe.create_physical_plan().await.unwrap();
///
-/// // Format using display string
+/// // Format using display string in verbose mode
/// let displayable_plan = displayable(physical_plan.as_ref());
-/// let plan_string = format!("{}", displayable_plan.indent());
+/// let plan_string = format!("{}", displayable_plan.indent(true));
///
/// let working_directory = std::env::current_dir().unwrap();
/// let normalized = Path::from_filesystem_path(working_directory).unwrap();
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index 5eb578334e..d1e1a4f9db 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -266,7 +266,7 @@ impl ExecutionPlan for ProjectionExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
let expr: Vec<String> = self
.expr
.iter()
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs
index d7dc54afd6..e18ba302e0 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -450,7 +450,7 @@ impl ExecutionPlan for RepartitionExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"RepartitionExec: partitioning={:?}, input_partitions={}",
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 58c257c97f..4983b0ea83 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -625,7 +625,7 @@ impl ExecutionPlan for SortExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
match self.fetch {
Some(fetch) => {
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 1195959a89..4db1fea2a4 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -207,7 +207,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))
}
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index a81c43398c..ffd3f06ee8 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -253,7 +253,7 @@ impl ExecutionPlan for UnionExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "UnionExec")
}
}
@@ -427,7 +427,7 @@ impl ExecutionPlan for InterleaveExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "InterleaveExec")
}
}
diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs
index cd42c3305f..2e5c92872f 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -132,7 +132,7 @@ impl ExecutionPlan for UnnestExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "UnnestExec")
}
}
diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs
index d1cf6927a2..4099cfdb36 100644
--- a/datafusion/core/src/physical_plan/values.rs
+++ b/datafusion/core/src/physical_plan/values.rs
@@ -151,7 +151,7 @@ impl ExecutionPlan for ValuesExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ValuesExec")
}
}
diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index 3a95308503..2512776e8d 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -305,7 +305,7 @@ impl ExecutionPlan for BoundedWindowAggExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "BoundedWindowAggExec: ")?;
let g: Vec<String> = self
.window_expr
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index f57dfbc0b6..4a0648c8f1 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -231,7 +231,7 @@ impl ExecutionPlan for WindowAggExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "WindowAggExec: ")?;
let g: Vec<String> = self
.window_expr
diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index 3cb6efc654..d695bcd99a 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1815,7 +1815,7 @@ impl DefaultPhysicalPlanner {
Ok(input) => {
stringified_plans.push(
displayable(input.as_ref())
- .to_stringified(InitialPhysicalPlan),
+ .to_stringified(e.verbose, InitialPhysicalPlan),
);
match self.optimize_internal(
@@ -1824,13 +1824,15 @@ impl DefaultPhysicalPlanner {
|plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = OptimizedPhysicalPlan { optimizer_name };
- stringified_plans
- .push(displayable(plan).to_stringified(plan_type));
+ stringified_plans.push(
+ displayable(plan)
+ .to_stringified(e.verbose, plan_type),
+ );
},
) {
Ok(input) => stringified_plans.push(
displayable(input.as_ref())
- .to_stringified(FinalPhysicalPlan),
+ .to_stringified(e.verbose, FinalPhysicalPlan),
),
Err(DataFusionError::Context(optimizer_name, e)) => {
let plan_type = OptimizedPhysicalPlan { optimizer_name };
@@ -1873,11 +1875,11 @@ impl DefaultPhysicalPlanner {
let optimizers = session_state.physical_optimizers();
debug!(
"Input physical plan:\n{}\n",
- displayable(plan.as_ref()).indent()
+ displayable(plan.as_ref()).indent(false)
);
trace!(
"Detailed input physical plan:\n{}",
- displayable(plan.as_ref()).indent()
+ displayable(plan.as_ref()).indent(true)
);
let mut new_plan = plan;
@@ -1903,13 +1905,13 @@ impl DefaultPhysicalPlanner {
trace!(
"Optimized physical plan by {}:\n{}\n",
optimizer.name(),
- displayable(new_plan.as_ref()).indent()
+ displayable(new_plan.as_ref()).indent(false)
);
observer(new_plan.as_ref(), optimizer.as_ref())
}
debug!(
"Optimized physical plan:\n{}\n",
- displayable(new_plan.as_ref()).indent()
+ displayable(new_plan.as_ref()).indent(false)
);
trace!("Detailed optimized physical plan:\n{:?}", new_plan);
Ok(new_plan)
@@ -2420,7 +2422,7 @@ mod tests {
} else {
panic!(
"Plan was not an explain plan: {}",
- displayable(plan.as_ref()).indent()
+ displayable(plan.as_ref()).indent(true)
);
}
}
@@ -2536,7 +2538,7 @@ mod tests {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "NoOpExecutionPlan")
}
}
diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs
index 41a0a1b4d0..32e7d4bcfe 100644
--- a/datafusion/core/src/test/exec.rs
+++ b/datafusion/core/src/test/exec.rs
@@ -231,7 +231,7 @@ impl ExecutionPlan for MockExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "MockExec")
}
}
@@ -358,7 +358,7 @@ impl ExecutionPlan for BarrierExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "BarrierExec")
}
}
@@ -437,7 +437,7 @@ impl ExecutionPlan for ErrorExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ErrorExec")
}
}
@@ -516,7 +516,7 @@ impl ExecutionPlan for StatisticsExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"StatisticsExec: col_count={}, row_count={:?}",
@@ -611,7 +611,7 @@ impl ExecutionPlan for BlockingExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "BlockingExec",)
}
}
@@ -749,7 +749,7 @@ impl ExecutionPlan for PanicExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "PanickingExec",)
}
}
diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs
index 01d502f29f..6715a6ba98 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -412,7 +412,7 @@ impl ExecutionPlan for UnboundedExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"UnboundableExec: unbounded={}",
diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs
index a8871639f7..7374a056d6 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -144,7 +144,7 @@ impl ExecutionPlan for CustomExecutionPlan {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "CustomExecutionPlan: projection={:#?}", self.projection)
}
}
diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
index a809832387..36b15f4359 100644
--- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
@@ -103,7 +103,7 @@ impl ExecutionPlan for CustomPlan {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "CustomPlan: batch_size={}", self.batches.len(),)
}
}
diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs
index ca83ab1cf6..73237fa857 100644
--- a/datafusion/core/tests/custom_sources_cases/statistics.rs
+++ b/datafusion/core/tests/custom_sources_cases/statistics.rs
@@ -157,7 +157,7 @@ impl ExecutionPlan for StatisticsValidation {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"StatisticsValidation: col_count={}, row_count={:?}",
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index 971dea8128..974721144f 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -613,7 +613,7 @@ async fn test_physical_plan_display_indent() {
];
let normalizer = ExplainNormalizer::new();
- let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
+ let actual = format!("{}", displayable(physical_plan.as_ref()).indent(true))
.trim()
.lines()
// normalize paths
@@ -659,7 +659,7 @@ async fn test_physical_plan_display_indent_multi_children() {
];
let normalizer = ExplainNormalizer::new();
- let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
+ let actual = format!("{}", displayable(physical_plan.as_ref()).indent(true))
.trim()
.lines()
// normalize paths
diff --git a/datafusion/core/tests/sql/order.rs b/datafusion/core/tests/sql/order.rs
index e1a8221ecc..2054a9937c 100644
--- a/datafusion/core/tests/sql/order.rs
+++ b/datafusion/core/tests/sql/order.rs
@@ -220,7 +220,7 @@ ORDER BY 1, 2;
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
];
- let formatted = displayable(plan.as_ref()).indent().to_string();
+ let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
expected, actual,
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index c237d8e632..493d90d91a 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -484,7 +484,7 @@ impl ExecutionPlan for TopKExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
- DisplayFormatType::Default => {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "TopKExec: k={}", self.k)
}
}
diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
index dd1504679a..25d60471a9 100644
--- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
@@ -66,8 +66,11 @@ async fn parquet_exec() -> Result<()> {
consumer::from_substrait_rel(&mut ctx, substrait_rel.as_ref(), &HashMap::new())
.await?;
- let expected = format!("{}", displayable(parquet_exec.as_ref()).indent());
- let actual = format!("{}", displayable(parquet_exec_roundtrip.as_ref()).indent());
+ let expected = format!("{}", displayable(parquet_exec.as_ref()).indent(true));
+ let actual = format!(
+ "{}",
+ displayable(parquet_exec_roundtrip.as_ref()).indent(true)
+ );
assert_eq!(expected, actual);
Ok(())