You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/19 14:32:59 UTC
[arrow-datafusion] branch master updated: refactor: display input partitions for `RepartitionExec` (#4969)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 19f6f19c1 refactor: display input partitions for `RepartitionExec` (#4969)
19f6f19c1 is described below
commit 19f6f19c1d4783f9bcfde83744ee436f8e984154
Author: Marco Neumann <ma...@crepererum.net>
AuthorDate: Thu Jan 19 14:32:52 2023 +0000
refactor: display input partitions for `RepartitionExec` (#4969)
Closes #4964.
---
.../src/physical_optimizer/dist_enforcement.rs | 112 ++++++++++-----------
.../core/src/physical_optimizer/repartition.rs | 28 +++---
datafusion/core/src/physical_plan/mod.rs | 2 +-
.../core/src/physical_plan/repartition/mod.rs | 7 +-
datafusion/core/tests/sql/avro.rs | 2 +-
datafusion/core/tests/sql/explain_analyze.rs | 14 +--
datafusion/core/tests/sql/joins.rs | 112 ++++++++++-----------
datafusion/core/tests/sql/json.rs | 2 +-
datafusion/core/tests/sql/window.rs | 36 +++----
9 files changed, 160 insertions(+), 155 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index cc94ad14a..c6c2bd40e 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -1223,25 +1223,25 @@ mod tests {
JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![
top_join_plan.as_str(),
join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs
_ => vec![
top_join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=10",
join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
@@ -1279,26 +1279,26 @@ mod tests {
vec![
top_join_plan.as_str(),
join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs
_ =>
vec![
top_join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 6 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 6 }], 10), input_partitions=10",
join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
@@ -1348,11 +1348,11 @@ mod tests {
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a1\", index: 0 }, Column { name: \"c\", index: 2 })]",
"ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join);
@@ -1370,11 +1370,11 @@ mod tests {
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"c\", index: 2 })]",
"ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
@@ -1417,15 +1417,15 @@ mod tests {
// The original Output partition can not satisfy the Join requirements and need to add an additional RepartitionExec
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"c\", index: 2 })]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=10",
"ProjectionExec: expr=[c1@0 as a]",
"ProjectionExec: expr=[c@2 as c1]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
@@ -1457,11 +1457,11 @@ mod tests {
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a1\", index: 0 }, Column { name: \"a2\", index: 0 })]",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 0 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
@@ -1506,11 +1506,11 @@ mod tests {
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"b1\", index: 1 }, Column { name: \"b\", index: 0 }), (Column { name: \"a1\", index: 0 }, Column { name: \"a\", index: 1 })]",
"ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
"AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
@@ -1613,15 +1613,15 @@ mod tests {
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"B\", index: 2 }, Column { name: \"b1\", index: 6 }), (Column { name: \"C\", index: 3 }, Column { name: \"c\", index: 2 }), (Column { name: \"AA\", index: 1 }, Column { name: \"a1\", index: 5 })]",
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
@@ -1733,15 +1733,15 @@ mod tests {
top_join_plan.as_str(),
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 })]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
@@ -1852,15 +1852,15 @@ mod tests {
top_join_plan.as_str(),
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 })]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
@@ -1926,31 +1926,31 @@ mod tests {
top_join_plan.as_str(),
join_plan.as_str(),
"SortExec: [a@0 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: [b1@1 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: [c@2 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs
_ => vec![
top_join_plan.as_str(),
"SortExec: [a@0 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=10",
join_plan.as_str(),
"SortExec: [a@0 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: [b1@1 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: [c@2 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
@@ -1979,31 +1979,31 @@ mod tests {
top_join_plan.as_str(),
join_plan.as_str(),
"SortExec: [a@0 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: [b1@1 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: [c@2 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs and 4 SortExecs
_ => vec![
top_join_plan.as_str(),
"SortExec: [b1@6 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 6 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 6 }], 10), input_partitions=10",
join_plan.as_str(),
"SortExec: [a@0 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: [b1@1 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: [c@2 ASC]",
- "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
@@ -2069,13 +2069,13 @@ mod tests {
"ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
"ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
"AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: [b2@1 ASC,a2@0 ASC]",
"ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10)",
+ "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 81f6b2eda..8595aa8ab 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -495,7 +495,7 @@ mod tests {
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
@@ -512,7 +512,7 @@ mod tests {
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"FilterExec: c1@0",
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
@@ -530,7 +530,7 @@ mod tests {
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// nothing sorts the data, so the local limit doesn't require sorted data either
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
@@ -548,7 +548,7 @@ mod tests {
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// nothing sorts the data, so the local limit doesn't require sorted data either
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
@@ -598,13 +598,13 @@ mod tests {
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// repartition should happen prior to the filter to maximize parallelism
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// Expect no repartition to happen for local limit
@@ -625,13 +625,13 @@ mod tests {
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=5, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// repartition should happen prior to the filter to maximize parallelism
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// Expect no repartition to happen for local limit
@@ -671,7 +671,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: [c1@0 ASC]",
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
@@ -754,7 +754,7 @@ mod tests {
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
// union input 2: should repartition
"FilterExec: c1@0",
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
@@ -772,7 +772,7 @@ mod tests {
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
@@ -806,7 +806,7 @@ mod tests {
// Expect repartition on the input to the sort (as it can benefit from additional parallelism)
"SortExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
@@ -824,7 +824,7 @@ mod tests {
// Expect repartition on the input to the sort (as it can benefit from additional parallelism)
"SortExec: [c1@0 ASC]",
"FilterExec: c1@0",
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
@@ -846,7 +846,7 @@ mod tests {
"ProjectionExec: expr=[c1@0 as c1]",
"FilterExec: c1@0",
// repartition is lowest down
- "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 0417814f3..fc6b4066c 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -322,7 +322,7 @@ pub fn with_new_children_if_necessary(
/// assert_eq!("ProjectionExec: expr=[a@0 as a]\
/// \n CoalesceBatchesExec: target_batch_size=8192\
/// \n FilterExec: a@0 < 5\
-/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\
+/// \n RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1\
/// \n CsvExec: files={1 group: [[WORKING_DIR/tests/data/example.csv]]}, has_header=true, limit=None, projection=[a]",
/// plan_string.trim());
///
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs
index 1d0f1fe5c..8b9d44a99 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -445,7 +445,12 @@ impl ExecutionPlan for RepartitionExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
- write!(f, "RepartitionExec: partitioning={:?}", self.partitioning)
+ write!(
+ f,
+ "RepartitionExec: partitioning={:?}, input_partitions={}",
+ self.partitioning,
+ self.input.output_partitioning().partition_count()
+ )
}
}
}
diff --git a/datafusion/core/tests/sql/avro.rs b/datafusion/core/tests/sql/avro.rs
index 202a175b2..58b318abd 100644
--- a/datafusion/core/tests/sql/avro.rs
+++ b/datafusion/core/tests/sql/avro.rs
@@ -150,7 +150,7 @@ async fn avro_explain() {
\n AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
\n CoalescePartitionsExec\
\n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
- \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
+ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
\n AvroExec: files={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, limit=None\
\n",
],
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index 1e4f081f4..1644ca4e7 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -612,11 +612,11 @@ async fn test_physical_plan_display_indent() {
" ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000), input_partitions=9000",
" AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12@1 < 10",
- " RepartitionExec: partitioning=RoundRobinBatch(9000)",
+ " RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c12]",
];
@@ -656,14 +656,14 @@ async fn test_physical_plan_display_indent_multi_children() {
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 0 })]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000), input_partitions=9000",
" ProjectionExec: expr=[c1@0 as c1]",
- " RepartitionExec: partitioning=RoundRobinBatch(9000)",
+ " RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000), input_partitions=9000",
" ProjectionExec: expr=[c1@0 as c2]",
- " RepartitionExec: partitioning=RoundRobinBatch(9000)",
+ " RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]",
];
@@ -705,7 +705,7 @@ async fn csv_explain() {
"ProjectionExec: expr=[c1@0 as c1]\
\n CoalesceBatchesExec: target_batch_size=4096\
\n FilterExec: c2@1 > 10\
- \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
+ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
\n CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2]\
\n",
]];
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index c73a3ad20..ac02a0d57 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -1883,13 +1883,13 @@ async fn sort_merge_join_on_date32() -> Result<()> {
" SortMergeJoin: join_type=Inner, on=[(Column { name: \"c1\", index: 0 }, Column { name: \"c1\", index: 0 })]",
" SortExec: [c1@0 ASC]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" SortExec: [c1@0 ASC]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
];
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
@@ -1929,14 +1929,14 @@ async fn sort_merge_join_on_decimal() -> Result<()> {
" SortMergeJoin: join_type=Right, on=[(Column { name: \"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }, Column { name: \"c3\", index: 2 })]",
" SortExec: [CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }], 2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }], 2), input_partitions=2",
" ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" SortExec: [c3@2 ASC]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c3\", index: 2 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"c3\", index: 2 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
];
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
@@ -1986,13 +1986,13 @@ async fn left_semi_join() -> Result<()> {
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
" ProjectionExec: expr=[t2_id@0 as t2_id]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
} else {
@@ -2004,7 +2004,7 @@ async fn left_semi_join() -> Result<()> {
" HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
" MemoryExec: partitions=1, partition_sizes=[1]",
" ProjectionExec: expr=[t2_id@0 as t2_id]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
};
@@ -2068,12 +2068,12 @@ async fn left_semi_join() -> Result<()> {
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
} else {
@@ -2081,7 +2081,7 @@ async fn left_semi_join() -> Result<()> {
"SortExec: [t1_id@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
" MemoryExec: partitions=1, partition_sizes=[1]",
@@ -2237,12 +2237,12 @@ async fn right_semi_join() -> Result<()> {
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
} else {
@@ -2250,7 +2250,7 @@ async fn right_semi_join() -> Result<()> {
"SortExec: [t1_id@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
" MemoryExec: partitions=1, partition_sizes=[1]",
@@ -2285,12 +2285,12 @@ async fn right_semi_join() -> Result<()> {
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
} else {
@@ -2298,7 +2298,7 @@ async fn right_semi_join() -> Result<()> {
"SortExec: [t1_id@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
" MemoryExec: partitions=1, partition_sizes=[1]",
@@ -2474,14 +2474,14 @@ async fn reduce_cross_join_with_wildcard_and_expr() -> Result<()> {
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 3 }, Column { name: \"CAST(t2.t2_id AS Int64)\", index: 3 })]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + Int64(11)\", index: 3 }], 2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + Int64(11)\", index: 3 }], 2), input_partitions=2",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as t1.t1_id + Int64(11)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"CAST(t2.t2_id AS Int64)\", index: 3 }], 2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"CAST(t2.t2_id AS Int64)\", index: 3 }], 2), input_partitions=2",
" ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(t2.t2_id AS Int64)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
} else {
@@ -2492,10 +2492,10 @@ async fn reduce_cross_join_with_wildcard_and_expr() -> Result<()> {
" HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 3 }, Column { name: \"CAST(t2.t2_id AS Int64)\", index: 3 })]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as t1.t1_id + Int64(11)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(t2.t2_id AS Int64)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
};
@@ -2546,14 +2546,14 @@ async fn both_side_expr_key_inner_join() -> Result<()> {
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }, Column { name: \"t2.t2_id + UInt32(1)\", index: 1 })]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }], 2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }], 2), input_partitions=2",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as t1.t1_id + UInt32(12)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id + UInt32(1)\", index: 1 }], 2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id + UInt32(1)\", index: 1 }], 2), input_partitions=2",
" ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as t2.t2_id + UInt32(1)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
} else {
@@ -2564,10 +2564,10 @@ async fn both_side_expr_key_inner_join() -> Result<()> {
" HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }, Column { name: \"t2.t2_id + UInt32(1)\", index: 1 })]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as t1.t1_id + UInt32(12)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as t2.t2_id + UInt32(1)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
};
@@ -2617,25 +2617,25 @@ async fn left_side_expr_key_inner_join() -> Result<()> {
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }], 2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }], 2), input_partitions=2",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as t1.t1_id + UInt32(11)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
} else {
vec![
"ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as t1.t1_id + UInt32(11)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
@@ -2686,13 +2686,13 @@ async fn right_side_expr_key_inner_join() -> Result<()> {
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 1 })]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 1 }], 2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 1 }], 2), input_partitions=2",
" ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
} else {
@@ -2703,7 +2703,7 @@ async fn right_side_expr_key_inner_join() -> Result<()> {
" HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 1 })]",
" MemoryExec: partitions=1, partition_sizes=[1]",
" ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
};
@@ -2753,13 +2753,13 @@ async fn select_wildcard_with_expr_key_inner_join() -> Result<()> {
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 3 })]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 3 }], 2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 3 }], 2), input_partitions=2",
" ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
} else {
@@ -2770,7 +2770,7 @@ async fn select_wildcard_with_expr_key_inner_join() -> Result<()> {
" HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 3 })]",
" MemoryExec: partitions=1, partition_sizes=[1]",
" ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
};
diff --git a/datafusion/core/tests/sql/json.rs b/datafusion/core/tests/sql/json.rs
index b6b847382..26ddff61d 100644
--- a/datafusion/core/tests/sql/json.rs
+++ b/datafusion/core/tests/sql/json.rs
@@ -93,7 +93,7 @@ async fn json_explain() {
\n AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
\n CoalescePartitionsExec\
\n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
- \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
+ \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
\n JsonExec: limit=None, files={1 group: [[WORKING_DIR/tests/jsons/2.json]]}\n",
],
];
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index 1167d57a4..24d86d527 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -1682,7 +1682,7 @@ async fn test_window_agg_sort() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum2]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
@@ -1714,7 +1714,7 @@ async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MI [...]
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
" WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
@@ -1749,7 +1749,7 @@ async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()>
"SortExec: [c2@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN U [...]
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
" WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" SortExec: [c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]",
@@ -1792,12 +1792,12 @@ async fn test_window_partition_by_order_by() -> Result<()> {
" BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
" SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
" SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
]
};
@@ -1830,7 +1830,7 @@ async fn test_window_agg_sort_reversed_plan() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
@@ -1886,7 +1886,7 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@0 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as lag1, LAG(aggregate_tes [...]
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_ [...]
" BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dic [...]
@@ -1938,7 +1938,7 @@ async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
" SortExec: [c9@0 ASC NULLS LAST]",
@@ -1992,7 +1992,7 @@ async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWE [...]
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
" SortExec: [c9@2 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
@@ -2080,7 +2080,7 @@ async fn test_window_agg_complex_plan() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@12 as d, SUM(nul [...]
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Prec [...]
" WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
@@ -2129,7 +2129,7 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()>
let expected = {
vec![
"ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
@@ -2184,7 +2184,7 @@ async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
@@ -2238,7 +2238,7 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum2]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_o [...]
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED [...]
@@ -2297,15 +2297,15 @@ async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> {
" AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]",
" CoalescePartitionsExec",
" AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]",
- " RepartitionExec: partitioning=RoundRobinBatch(8)",
+ " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CoalescePartitionsExec",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8)",
+ " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8",
" AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
- " RepartitionExec: partitioning=RoundRobinBatch(8)",
+ " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
]
};
@@ -2351,7 +2351,7 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
let expected = {
vec![
"ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",