You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/04 15:14:12 UTC
[arrow-datafusion] branch main updated: Supply consistent format output for FileScanConfig params (#6202)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6be75ff2dc Supply consistent format output for FileScanConfig params (#6202)
6be75ff2dc is described below
commit 6be75ff2dcc47128b78a695477512ba86c46373f
Author: Tzu-Chiao Yeh <su...@gmail.com>
AuthorDate: Thu May 4 23:14:06 2023 +0800
Supply consistent format output for FileScanConfig params (#6202)
* Supply consistent format output for FileScanConfig params
* Compact display output and optimize output ordering display
* Appease clippy
* update sqllogictest
---------
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion-cli/src/object_storage.rs | 3 +-
datafusion/core/src/datasource/view.rs | 3 +-
.../combine_partial_final_agg.rs | 8 +-
.../src/physical_optimizer/dist_enforcement.rs | 108 +++++++--------
.../core/src/physical_optimizer/repartition.rs | 84 ++++++------
.../src/physical_optimizer/sort_enforcement.rs | 152 ++++++++++-----------
.../core/src/physical_plan/file_format/avro.rs | 7 +-
.../core/src/physical_plan/file_format/csv.rs | 7 +-
.../core/src/physical_plan/file_format/json.rs | 7 +-
.../core/src/physical_plan/file_format/mod.rs | 40 ++++++
.../core/src/physical_plan/file_format/parquet.rs | 30 +---
.../src/physical_plan/joins/symmetric_hash_join.rs | 8 +-
datafusion/core/src/physical_plan/mod.rs | 2 +-
datafusion/core/tests/sql/avro.rs | 2 +-
datafusion/core/tests/sql/explain_analyze.rs | 6 +-
datafusion/core/tests/sql/json.rs | 2 +-
.../tests/sqllogictests/test_files/explain.slt | 36 ++++-
.../core/tests/sqllogictests/test_files/order.slt | 2 +-
.../core/tests/sqllogictests/test_files/select.slt | 3 +-
.../core/tests/sqllogictests/test_files/window.slt | 50 +++----
docs/source/user-guide/sql/explain.md | 4 +-
21 files changed, 301 insertions(+), 263 deletions(-)
diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs
index 1fd509775d..8d29f80a4b 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -112,7 +112,8 @@ fn get_bucket_name(url: &Url) -> Result<&str> {
#[cfg(test)]
mod tests {
use datafusion::{
- datasource::listing::ListingTableUrl, logical_expr::{LogicalPlan, DdlStatement},
+ datasource::listing::ListingTableUrl,
+ logical_expr::{DdlStatement, LogicalPlan},
prelude::SessionContext,
};
diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index dea7ecadf7..391e4b93c4 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -473,7 +473,8 @@ mod tests {
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
- assert!(formatted.contains("ParquetExec: limit=Some(10)"));
+ assert!(formatted.contains("ParquetExec: "));
+ assert!(formatted.contains("projection=[bool_col, int_col], limit=10"));
Ok(())
}
diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 0674b6aa31..59e5fc95ed 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -334,7 +334,7 @@ mod tests {
"AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
];
assert_optimized!(expected, plan);
@@ -362,7 +362,7 @@ mod tests {
let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]",
"AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
];
assert_optimized!(expected, plan);
@@ -391,7 +391,7 @@ mod tests {
// should combine the Partial/Final AggregateExecs to tne Single AggregateExec
let expected = &[
"AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
];
assert_optimized!(expected, plan);
@@ -425,7 +425,7 @@ mod tests {
// should combine the Partial/Final AggregateExecs to tne Single AggregateExec
let expected = &[
"AggregateExec: mode=Single, gby=[c@2 as c], aggr=[Sum(b)]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
];
assert_optimized!(expected, plan);
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index f74a7f2e93..e6fd15b7ce 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -1273,12 +1273,12 @@ mod tests {
top_join_plan.as_str(),
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs
_ => vec![
@@ -1286,12 +1286,12 @@ mod tests {
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=10",
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join);
@@ -1329,12 +1329,12 @@ mod tests {
top_join_plan.as_str(),
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs
_ =>
@@ -1343,12 +1343,12 @@ mod tests {
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 6 }], 10), input_partitions=10",
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join);
@@ -1398,11 +1398,11 @@ mod tests {
"ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join);
@@ -1420,11 +1420,11 @@ mod tests {
"ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join);
@@ -1471,11 +1471,11 @@ mod tests {
"ProjectionExec: expr=[c@2 as c1]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join);
@@ -1508,11 +1508,11 @@ mod tests {
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 0 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, join);
Ok(())
@@ -1557,11 +1557,11 @@ mod tests {
"AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, join);
Ok(())
@@ -1663,16 +1663,16 @@ mod tests {
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, filter_top_join);
Ok(())
@@ -1785,16 +1785,16 @@ mod tests {
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_plan_txt!(expected, reordered);
@@ -1906,16 +1906,16 @@ mod tests {
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_plan_txt!(expected, reordered);
@@ -1980,14 +1980,14 @@ mod tests {
join_plan.as_str(),
"SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b1@1 ASC]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[c@2 ASC]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs
_ => vec![
@@ -1997,14 +1997,14 @@ mod tests {
join_plan.as_str(),
"SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b1@1 ASC]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[c@2 ASC]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join);
@@ -2033,14 +2033,14 @@ mod tests {
join_plan.as_str(),
"SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b1@1 ASC]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[c@2 ASC]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs and 4 SortExecs
_ => vec![
@@ -2050,14 +2050,14 @@ mod tests {
join_plan.as_str(),
"SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b1@1 ASC]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[c@2 ASC]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join);
@@ -2124,13 +2124,13 @@ mod tests {
"AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b2@1 ASC,a2@0 ASC]",
"ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, join);
Ok(())
@@ -2159,7 +2159,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
"CoalesceBatchesExec: target_batch_size=4096",
- "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[a@0 ASC], projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected, exec);
Ok(())
@@ -2193,11 +2193,11 @@ mod tests {
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan);
Ok(())
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 1db61e379e..e7c72398fd 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -575,7 +575,7 @@ mod tests {
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -592,7 +592,7 @@ mod tests {
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -610,7 +610,7 @@ mod tests {
"FilterExec: c1@0",
// nothing sorts the data, so the local limit doesn't require sorted data either
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -628,7 +628,7 @@ mod tests {
"FilterExec: c1@0",
// nothing sorts the data, so the local limit doesn't require sorted data either
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -644,7 +644,7 @@ mod tests {
"LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
"SortExec: expr=[c1@0 ASC]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -662,7 +662,7 @@ mod tests {
// data is sorted so can't repartition here even though
// filter would benefit from parallelism, the answers might be wrong
"SortExec: expr=[c1@0 ASC]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -687,7 +687,7 @@ mod tests {
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// Expect no repartition to happen for local limit
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -714,7 +714,7 @@ mod tests {
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// Expect no repartition to happen for local limit
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -730,11 +730,11 @@ mod tests {
let expected = &[
"UnionExec",
// Expect no repartition of ParquetExec
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -751,7 +751,7 @@ mod tests {
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: expr=[c1@0 ASC]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -766,7 +766,7 @@ mod tests {
// should not repartition / sort (as the data was already sorted)
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
- "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
assert_optimized!(expected, plan);
@@ -783,8 +783,8 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"UnionExec",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
assert_optimized!(expected, plan);
@@ -801,7 +801,7 @@ mod tests {
// should not repartition as doing so destroys the necessary sort order
let expected = &[
"SortRequiredExec",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
assert_optimized!(expected, plan);
@@ -830,11 +830,11 @@ mod tests {
"UnionExec",
// union input 1: no repartitioning
"SortRequiredExec",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
// union input 2: should repartition
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -852,7 +852,7 @@ mod tests {
"SortExec: expr=[c1@0 ASC]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[c1@0 as c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -869,7 +869,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
- "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
assert_optimized!(expected, plan);
@@ -884,7 +884,7 @@ mod tests {
let expected = &[
"SortExec: expr=[c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -902,7 +902,7 @@ mod tests {
"SortExec: expr=[c1@0 ASC]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -924,7 +924,7 @@ mod tests {
"FilterExec: c1@0",
// repartition is lowest down
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -939,7 +939,7 @@ mod tests {
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
- "ParquetExec: limit=None, partitions={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]",
+ "ParquetExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]",
];
assert_optimized!(expected, plan, 2, true, 10);
@@ -955,7 +955,7 @@ mod tests {
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
// Plan already has two partitions
- "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, projection=[c1]",
+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1]",
];
assert_optimized!(expected, plan, 2, true, 10);
@@ -971,7 +971,7 @@ mod tests {
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
// Multiple source files splitted across partitions
- "ParquetExec: limit=None, partitions={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]",
+ "ParquetExec: file_groups={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]",
];
assert_optimized!(expected, plan, 4, true, 10);
@@ -988,7 +988,7 @@ mod tests {
// data is sorted so can't repartition here
"SortExec: expr=[c1@0 ASC]",
// Doesn't parallelize for SortExec without preserve_partitioning
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan, 2, true, 10);
@@ -1007,7 +1007,7 @@ mod tests {
// filter would benefit from parallelism, the answers might be wrong
"SortExec: expr=[c1@0 ASC]",
// SortExec doesn't benefit from input partitioning
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan, 2, true, 10);
@@ -1032,7 +1032,7 @@ mod tests {
"GlobalLimitExec: skip=0, fetch=100",
// Limit doesn't benefit from input partitionins - no parallelism
"LocalLimitExec: fetch=100",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan, 2, true, 10);
@@ -1045,12 +1045,12 @@ mod tests {
let expected = &[
"UnionExec",
- // Union doesn benefit from input partitioning - no parallelism
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
+ // Union doesn't benefit from input partitioning - no parallelism
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan, 2, true, 10);
@@ -1064,7 +1064,7 @@ mod tests {
// parallelization potentially could break sort order
let expected = &[
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
assert_optimized!(expected, plan, 2, true, 10);
@@ -1081,8 +1081,8 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"UnionExec",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
assert_optimized!(expected, plan, 2, true, 10);
@@ -1099,7 +1099,7 @@ mod tests {
// no parallelization to preserve sort order
let expected = &[
"SortRequiredExec",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
assert_optimized!(expected, plan, 2, true, 10);
@@ -1114,7 +1114,7 @@ mod tests {
// data should not be repartitioned / resorted
let expected = &[
"ProjectionExec: expr=[c1@0 as c1]",
- "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
assert_optimized!(expected, plan, 2, true, 10);
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 1032a16c7f..713eaf288d 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -1748,11 +1748,11 @@ mod tests {
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
" UnionExec",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" GlobalLimitExec: skip=0, fetch=100",
" LocalLimitExec: fetch=100",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
// We should keep the bottom `SortExec`.
@@ -1761,11 +1761,11 @@ mod tests {
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
" UnionExec",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" GlobalLimitExec: skip=0, fetch=100",
" LocalLimitExec: fetch=100",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -1838,9 +1838,9 @@ mod tests {
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
// should not add a sort at the output of the union, input plan should not be changed
let expected_optimized = expected_input.clone();
@@ -1869,9 +1869,9 @@ mod tests {
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
// should not add a sort at the output of the union, input plan should not be changed
let expected_optimized = expected_input.clone();
@@ -1902,18 +1902,18 @@ mod tests {
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -1945,20 +1945,20 @@ mod tests {
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
// should adjust sorting in the first input of the union such that it is not unnecessarily fine
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -1990,20 +1990,20 @@ mod tests {
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -2043,17 +2043,17 @@ mod tests {
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -2089,11 +2089,11 @@ mod tests {
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
// Should adjust the requirement in the third input of the union so
// that it is not unnecessarily fine.
@@ -2101,11 +2101,11 @@ mod tests {
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -2132,18 +2132,18 @@ mod tests {
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
// Union preserves the inputs ordering and we should not change any of the SortExecs under UnionExec
let expected_output = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_output, physical_plan);
Ok(())
@@ -2186,16 +2186,16 @@ mod tests {
let expected_input = vec![
"UnionExec",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST,non_nullable_col@1 DESC NULLS LAST]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
// Since `UnionExec` doesn't preserve ordering in the plan above.
// We shouldn't keep SortExecs in the plan.
let expected_optimized = vec![
"UnionExec",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -2237,16 +2237,16 @@ mod tests {
" SortPreservingMergeExec: [nullable_col@0 DESC NULLS LAST]",
" UnionExec",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
];
let expected_optimized = vec![
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -2277,16 +2277,16 @@ mod tests {
" SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
];
let expected_optimized = vec![
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -2327,21 +2327,21 @@ mod tests {
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" GlobalLimitExec: skip=0, fetch=100",
" LocalLimitExec: fetch=100",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" GlobalLimitExec: skip=0, fetch=100",
" LocalLimitExec: fetch=100",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -2385,8 +2385,8 @@ mod tests {
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
join_plan2.as_str(),
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
];
let expected_optimized = match join_type {
JoinType::Inner
@@ -2397,9 +2397,9 @@ mod tests {
vec![
join_plan.as_str(),
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[col_a@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
]
}
_ => {
@@ -2408,9 +2408,9 @@ mod tests {
"SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
join_plan2.as_str(),
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[col_a@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
]
}
};
@@ -2462,8 +2462,8 @@ mod tests {
let expected_input = vec![
spm_plan,
join_plan2.as_str(),
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
];
let expected_optimized = match join_type {
JoinType::Inner | JoinType::Right | JoinType::RightAnti => {
@@ -2471,9 +2471,9 @@ mod tests {
vec![
join_plan.as_str(),
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[col_a@0 ASC,col_b@1 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
]
}
_ => {
@@ -2482,9 +2482,9 @@ mod tests {
"SortExec: expr=[col_a@2 ASC,col_b@3 ASC]",
join_plan2.as_str(),
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[col_a@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
]
}
};
@@ -2519,8 +2519,8 @@ mod tests {
let expected_input = vec![
"SortPreservingMergeExec: [col_b@3 ASC,col_a@2 ASC]",
" SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
];
// can not push down the sort requirements, need to add SortExec
@@ -2528,9 +2528,9 @@ mod tests {
"SortExec: expr=[col_b@3 ASC,col_a@2 ASC]",
" SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[col_a@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -2545,8 +2545,8 @@ mod tests {
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,col_b@3 ASC,col_a@2 ASC]",
" SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
];
// can not push down the sort requirements, need to add SortExec
@@ -2554,9 +2554,9 @@ mod tests {
"SortExec: expr=[nullable_col@0 ASC,col_b@3 ASC,col_a@2 ASC]",
" SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
" SortExec: expr=[nullable_col@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
" SortExec: expr=[col_a@0 ASC]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[col_a, col_b]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -2627,14 +2627,14 @@ mod tests {
" FilterExec: NOT non_nullable_col@1",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" FilterExec: NOT non_nullable_col@1",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
- " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 87d3ca1ec7..16dc146750 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -136,12 +136,7 @@ impl ExecutionPlan for AvroExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
- write!(
- f,
- "AvroExec: files={}, limit={:?}",
- super::FileGroupsDisplay(&self.base_config.file_groups),
- self.base_config.limit,
- )
+ write!(f, "AvroExec: {}", self.base_config)
}
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 886c609e72..e2d2bb8ef7 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -172,11 +172,8 @@ impl ExecutionPlan for CsvExec {
DisplayFormatType::Default => {
write!(
f,
- "CsvExec: files={}, has_header={}, limit={:?}, projection={}",
- super::FileGroupsDisplay(&self.base_config.file_groups),
- self.has_header,
- self.base_config.limit,
- super::ProjectSchemaDisplay(&self.projected_schema),
+ "CsvExec: {}, has_header={}",
+ self.base_config, self.has_header,
)
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index cef69883e9..d122fd78b4 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -146,12 +146,7 @@ impl ExecutionPlan for NdJsonExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
- write!(
- f,
- "JsonExec: limit={:?}, files={}",
- self.base_config.limit,
- super::FileGroupsDisplay(&self.base_config.file_groups),
- )
+ write!(f, "JsonExec: {}", self.base_config)
}
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 4f85f54113..6591d83465 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -234,6 +234,29 @@ impl FileScanConfig {
}
}
+impl Display for FileScanConfig {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ let (schema, _, ordering) = self.project();
+
+ write!(f, "file_groups={}", FileGroupsDisplay(&self.file_groups))?;
+
+ if !schema.fields().is_empty() {
+ write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
+ }
+
+ if let Some(limit) = self.limit {
+ write!(f, ", limit={}", limit)?;
+ }
+
+ if let Some(orders) = ordering {
+ if !orders.is_empty() {
+ write!(f, ", output_ordering={}", OutputOrderingDisplay(&orders))?;
+ }
+ }
+ Ok(())
+ }
+}
+
/// A wrapper to customize partitioned file display
///
/// Prints in the format:
@@ -291,6 +314,23 @@ impl<'a> Display for ProjectSchemaDisplay<'a> {
}
}
+/// A wrapper to customize output ordering display.
+#[derive(Debug)]
+struct OutputOrderingDisplay<'a>(&'a [PhysicalSortExpr]);
+
+impl<'a> Display for OutputOrderingDisplay<'a> {
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ write!(f, "[")?;
+ for (i, e) in self.0.iter().enumerate() {
+ if i > 0 {
+ write!(f, ", ")?
+ }
+ write!(f, "{e}")?;
+ }
+ write!(f, "]")
+ }
+}
+
/// A utility which can adapt file-level record batches to a table schema which may have a schema
/// obtained from merging multiple file-level schemas.
///
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index c18b8fab74..0afb819d43 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -86,7 +86,7 @@ pub struct ParquetExec {
/// Override for `Self::with_enable_page_index`. If None, uses
/// values from base_config
enable_page_index: Option<bool>,
- /// Base configuraton for this scan
+ /// Base configuration for this scan
base_config: FileScanConfig,
projected_statistics: Statistics,
projected_schema: SchemaRef,
@@ -419,20 +419,10 @@ impl ExecutionPlan for ParquetExec {
.map(|pre| format!(", pruning_predicate={}", pre.predicate_expr()))
.unwrap_or_default();
- let output_ordering_string = self
- .output_ordering()
- .map(make_output_ordering_string)
- .unwrap_or_default();
-
write!(
f,
- "ParquetExec: limit={:?}, partitions={}{}{}{}, projection={}",
- self.base_config.limit,
- super::FileGroupsDisplay(&self.base_config.file_groups),
- predicate_string,
- pruning_predicate_string,
- output_ordering_string,
- super::ProjectSchemaDisplay(&self.projected_schema),
+ "ParquetExec: {}{}{}",
+ self.base_config, predicate_string, pruning_predicate_string,
)
}
}
@@ -447,20 +437,6 @@ impl ExecutionPlan for ParquetExec {
}
}
-fn make_output_ordering_string(ordering: &[PhysicalSortExpr]) -> String {
- use std::fmt::Write;
- let mut w: String = ", output_ordering=[".into();
-
- for (i, e) in ordering.iter().enumerate() {
- if i > 0 {
- write!(&mut w, ", ").unwrap()
- }
- write!(&mut w, "{e}").unwrap()
- }
- write!(&mut w, "]").unwrap();
- w
-}
-
/// Implements [`FileOpener`] for a parquet file
struct ParquetOpener {
partition_index: usize,
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 4356d4aa85..9013fb2692 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -2454,10 +2454,10 @@ mod tests {
"SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, [...]
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
- // " CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
+ // " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
- // " CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]"
+ // " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
]
};
let mut actual: Vec<&str> = formatted.trim().lines().collect();
@@ -2507,10 +2507,10 @@ mod tests {
"SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, [...]
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
- // " CsvExec: files={1 group: [[tempdir/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
+ // " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
- // " CsvExec: files={1 group: [[tempdir/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]"
+ // " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
]
};
let mut actual: Vec<&str> = formatted.trim().lines().collect();
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 00850f3237..c8edf701cf 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -328,7 +328,7 @@ pub fn with_new_children_if_necessary(
/// assert_eq!("CoalesceBatchesExec: target_batch_size=8192\
/// \n FilterExec: a@0 < 5\
/// \n RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1\
-/// \n CsvExec: files={1 group: [[WORKING_DIR/tests/data/example.csv]]}, has_header=true, limit=None, projection=[a]",
+/// \n CsvExec: file_groups={1 group: [[WORKING_DIR/tests/data/example.csv]]}, projection=[a], has_header=true",
/// plan_string.trim());
///
/// let one_line = format!("{}", displayable_plan.one_line());
diff --git a/datafusion/core/tests/sql/avro.rs b/datafusion/core/tests/sql/avro.rs
index d933db067d..85ed30044c 100644
--- a/datafusion/core/tests/sql/avro.rs
+++ b/datafusion/core/tests/sql/avro.rs
@@ -149,7 +149,7 @@ async fn avro_explain() {
\n CoalescePartitionsExec\
\n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
- \n AvroExec: files={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, limit=None\
+ \n AvroExec: file_groups={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, projection=[id]\
\n",
],
];
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index 6f7150d2a5..1ab933022d 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -609,7 +609,7 @@ async fn test_physical_plan_display_indent() {
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12@1 < 10",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
- " CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c12]",
+ " CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], has_header=true",
];
let normalizer = ExplainNormalizer::new();
@@ -650,12 +650,12 @@ async fn test_physical_plan_display_indent_multi_children() {
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000), input_partitions=9000",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
- " CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]",
+ " CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000), input_partitions=9000",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" ProjectionExec: expr=[c1@0 as c2]",
- " CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]",
+ " CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true",
];
let normalizer = ExplainNormalizer::new();
diff --git a/datafusion/core/tests/sql/json.rs b/datafusion/core/tests/sql/json.rs
index 965a9c14fc..10fcdfda20 100644
--- a/datafusion/core/tests/sql/json.rs
+++ b/datafusion/core/tests/sql/json.rs
@@ -92,7 +92,7 @@ async fn json_explain() {
\n CoalescePartitionsExec\
\n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
- \n JsonExec: limit=None, files={1 group: [[WORKING_DIR/tests/jsons/2.json]]}\n",
+ \n JsonExec: file_groups={1 group: [[WORKING_DIR/tests/jsons/2.json]]}, projection=[a]\n",
],
];
assert_eq!(expected, actual);
diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt b/datafusion/core/tests/sqllogictests/test_files/explain.slt
index fe1d3ac2e4..0fdc9bf61a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/explain.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt
@@ -47,7 +47,41 @@ ProjectionExec: expr=[c1@0 as c1]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: c2@1 > 10
RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2], has_header=true
+
+# explain_csv_exec_scan_config
+
+statement ok
+CREATE EXTERNAL TABLE aggregate_test_100_with_order (
+ c1 VARCHAR NOT NULL,
+ c2 TINYINT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT NOT NULL,
+ c5 INTEGER NOT NULL,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 INT UNSIGNED NOT NULL,
+ c10 BIGINT UNSIGNED NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+ )
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (c1 ASC)
+LOCATION '../../testing/data/csv/aggregate_test_100.csv';
+
+query TT
+explain SELECT c1 FROM aggregate_test_100_with_order order by c1 ASC limit 10
+----
+logical_plan
+Limit: skip=0, fetch=10
+ Sort: aggregate_test_100_with_order.c1 ASC NULLS LAST, fetch=10
+ TableScan: aggregate_test_100_with_order projection=[c1]
+physical_plan
+GlobalLimitExec: skip=0, fetch=10
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
## explain_physical_plan_only
diff --git a/datafusion/core/tests/sqllogictests/test_files/order.slt b/datafusion/core/tests/sqllogictests/test_files/order.slt
index d42d2cf62f..3b77319c0c 100644
--- a/datafusion/core/tests/sqllogictests/test_files/order.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/order.slt
@@ -165,7 +165,7 @@ Projection: aggregate_test_100.c1, aggregate_test_100.c2
physical_plan
ProjectionExec: expr=[c1@0 as c1, c2@1 as c2]
SortExec: expr=[c2@1 ASC NULLS LAST,c3@2 ASC NULLS LAST]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true
query II
SELECT c2, c3 FROM aggregate_test_100 ORDER BY c2, c3, c2
diff --git a/datafusion/core/tests/sqllogictests/test_files/select.slt b/datafusion/core/tests/sqllogictests/test_files/select.slt
index ff523d713d..26db6cc059 100644
--- a/datafusion/core/tests/sqllogictests/test_files/select.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/select.slt
@@ -661,9 +661,8 @@ select * from largeutf8_data where large_str != '1'
statement ok
CREATE TABLE empty_table;
-query
+statement ok
SELECT * FROM empty_table
-----
# TODO: boolean_literal
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 5ba3644923..66b011eeb9 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -1217,7 +1217,7 @@ ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate
ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as SUM(aggregate_test_100.c9)]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortExec: expr=[c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c8, c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c8, c9], has_header=true
# over_order_by_sort_keys_sorting_prefix_compacting
@@ -1238,7 +1238,7 @@ ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate
BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: "MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c9], has_header=true
# FIXME: for now we are not detecting prefix of sorting keys in order to re-arrange with global and save one SortExec
@@ -1263,7 +1263,7 @@ SortExec: expr=[c2@0 ASC NULLS LAST]
SortExec: expr=[c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]
BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c9], has_header=true
# test_window_partition_by_order_by
statement ok
@@ -1293,7 +1293,7 @@ ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_10
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }, Column { name: "c2", index: 1 }], 2), input_partitions=2
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c4]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c4], has_header=true
# test_window_agg_sort_reversed_plan
@@ -1318,7 +1318,7 @@ ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
SortExec: expr=[c9@0 DESC]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
query III
SELECT
@@ -1359,7 +1359,7 @@ ProjectionExec: expr=[c9@0 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [a
BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_orde [...]
BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: "LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, nullable: true, dict_id: 0, dict_is_or [...]
SortExec: expr=[c9@0 DESC]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
query IIIIIII
SELECT
@@ -1403,7 +1403,7 @@ ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 A
SortExec: expr=[c9@0 ASC NULLS LAST]
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
SortExec: expr=[c9@0 DESC]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
query III
@@ -1446,7 +1446,7 @@ ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
SortExec: expr=[c9@2 DESC,c1@0 DESC]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c9], has_header=true
query IIII
SELECT
@@ -1537,7 +1537,7 @@ ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST]
WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: P [...]
BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS LAST]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, projection=[c1, c2, c3], has_header=true
query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
SELECT
@@ -1612,7 +1612,7 @@ ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
query III
@@ -1656,7 +1656,7 @@ ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggre
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
query III
SELECT
@@ -1702,7 +1702,7 @@ ProjectionExec: expr=[c3@0 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate
ProjectionExec: expr=[c3@1 as c3, c4@2 as c4, c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(aggregate_test_100.c9)]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortExec: expr=[c3@1 + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c3, c4, c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c4, c9], has_header=true
query III
@@ -1756,7 +1756,7 @@ ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]
CoalesceBatchesExec: target_batch_size=4096
FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c13]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true
query I
SELECT count(*) as global_count FROM
@@ -1802,7 +1802,7 @@ GlobalLimitExec: skip=0, fetch=5
ProjectionExec: expr=[c3@1 as c3, c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(aggregate_test_100.c9)]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortExec: expr=[c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c2, c3, c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c9], has_header=true
@@ -1841,7 +1841,7 @@ SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true
query TI
SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC
@@ -1968,7 +1968,7 @@ SortExec: expr=[c1@0 ASC NULLS LAST]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true
statement ok
set datafusion.optimizer.repartition_sorts = true;
@@ -1997,7 +1997,7 @@ SortExec: expr=[c1@0 ASC NULLS LAST]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
# test_window_agg_with_global_limit
statement ok
@@ -2020,7 +2020,7 @@ ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as array_agg1]
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
GlobalLimitExec: skip=0, fetch=1
SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c13]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c13], has_header=true
query ?
@@ -2087,7 +2087,7 @@ GlobalLimitExec: skip=0, fetch=5
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }], mode=[Sorted]
WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c8, c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c8, c9], has_header=true
@@ -2146,7 +2146,7 @@ ProjectionExec: expr=[c9@1 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER B
WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC NULLS LAST,c8@2 ASC NULLS LAST]
ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c8@2 as c8, c9@3 as c9, c1@0 as c1_alias]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c8, c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c8, c9], has_header=true
@@ -2192,7 +2192,7 @@ ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2]
ProjectionExec: expr=[c1@0 as c1, c9@2 as c9, c12@3 as c12, SUM(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as SUM(aggregate_test_100.c12)]
BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c12): Ok(Field { name: "SUM(aggregate_test_100.c12)", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c9, c12]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c9, c12], has_header=true
query RR
SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum1,
@@ -2228,7 +2228,7 @@ GlobalLimitExec: skip=0, fetch=5
ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortExec: expr=[c9@0 ASC NULLS LAST]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
query II
SELECT c9, rn1 FROM (SELECT c9,
@@ -2267,7 +2267,7 @@ GlobalLimitExec: skip=0, fetch=5
ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortExec: expr=[c9@0 DESC]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
query II
SELECT c9, rn1 FROM (SELECT c9,
@@ -2307,7 +2307,7 @@ GlobalLimitExec: skip=0, fetch=5
ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortExec: expr=[c9@0 DESC]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
query II
SELECT c9, rn1 FROM (SELECT c9,
@@ -2350,7 +2350,7 @@ GlobalLimitExec: skip=0, fetch=5
ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortExec: expr=[c9@0 DESC]
- CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
query II
SELECT c9, rn1 FROM (SELECT c9,
diff --git a/docs/source/user-guide/sql/explain.md b/docs/source/user-guide/sql/explain.md
index ca4169d577..b240b14eb5 100644
--- a/docs/source/user-guide/sql/explain.md
+++ b/docs/source/user-guide/sql/explain.md
@@ -44,7 +44,7 @@ EXPLAIN SELECT SUM(x) FROM table GROUP BY b;
| | RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 16) |
| | AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[SUM(table.x)] |
| | RepartitionExec: partitioning=RoundRobinBatch(16) |
-| | CsvExec: source=Path(/tmp/table.csv: [/tmp/table.csv]), has_header=false, limit=None, projection=[x, b] |
+| | CsvExec: file_groups={1 group: [[/tmp/table.csv]]}, projection=[x, b], has_header=false |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
@@ -66,6 +66,6 @@ EXPLAIN ANALYZE SELECT SUM(x) FROM table GROUP BY b;
| | RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 16), metrics=[sendTime=839560, fetchTime=122528525, repartitionTime=5327877] |
| | HashAggregateExec: mode=Partial, gby=[b@1 as b], aggr=[SUM(x)], metrics=[outputRows=2] |
| | RepartitionExec: partitioning=RoundRobinBatch(16), metrics=[fetchTime=5660489, repartitionTime=0, sendTime=8012] |
-| | CsvExec: source=Path(/tmp/table.csv: [/tmp/table.csv]), has_header=false, metrics=[] |
+| | CsvExec: file_groups={1 group: [[/tmp/table.csv]]}, has_header=false, metrics=[] |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
```