You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/19 14:32:59 UTC

[arrow-datafusion] branch master updated: refactor: display input partitions for `RepartitionExec` (#4969)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 19f6f19c1 refactor: display input partitions for `RepartitionExec` (#4969)
19f6f19c1 is described below

commit 19f6f19c1d4783f9bcfde83744ee436f8e984154
Author: Marco Neumann <ma...@crepererum.net>
AuthorDate: Thu Jan 19 14:32:52 2023 +0000

    refactor: display input partitions for `RepartitionExec` (#4969)
    
    Closes #4964.
---
 .../src/physical_optimizer/dist_enforcement.rs     | 112 ++++++++++-----------
 .../core/src/physical_optimizer/repartition.rs     |  28 +++---
 datafusion/core/src/physical_plan/mod.rs           |   2 +-
 .../core/src/physical_plan/repartition/mod.rs      |   7 +-
 datafusion/core/tests/sql/avro.rs                  |   2 +-
 datafusion/core/tests/sql/explain_analyze.rs       |  14 +--
 datafusion/core/tests/sql/joins.rs                 | 112 ++++++++++-----------
 datafusion/core/tests/sql/json.rs                  |   2 +-
 datafusion/core/tests/sql/window.rs                |  36 +++----
 9 files changed, 160 insertions(+), 155 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index cc94ad14a..c6c2bd40e 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -1223,25 +1223,25 @@ mod tests {
                         JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![
                             top_join_plan.as_str(),
                             join_plan.as_str(),
-                            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
                             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-                            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
                             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-                            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
                             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         ],
                         // Should include 4 RepartitionExecs
                         _ => vec![
                             top_join_plan.as_str(),
-                            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=10",
                             join_plan.as_str(),
-                            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
                             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-                            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
                             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-                            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
                             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         ],
                     };
@@ -1279,26 +1279,26 @@ mod tests {
                             vec![
                                 top_join_plan.as_str(),
                                 join_plan.as_str(),
-                                "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+                                "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
                                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-                                "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+                                "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
                                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-                                "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+                                "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
                                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             ],
                         // Should include 4 RepartitionExecs
                         _ =>
                             vec![
                                 top_join_plan.as_str(),
-                                "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 6 }], 10)",
+                                "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 6 }], 10), input_partitions=10",
                                 join_plan.as_str(),
-                                "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+                                "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
                                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-                                "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+                                "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
                                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-                                "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+                                "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
                                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             ],
                     };
@@ -1348,11 +1348,11 @@ mod tests {
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a1\", index: 0 }, Column { name: \"c\", index: 2 })]",
             "ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, top_join);
@@ -1370,11 +1370,11 @@ mod tests {
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"c\", index: 2 })]",
             "ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
 
@@ -1417,15 +1417,15 @@ mod tests {
         // The original Output partition can not satisfy the Join requirements and need to add an additional RepartitionExec
         let expected = &[
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"c\", index: 2 })]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=10",
             "ProjectionExec: expr=[c1@0 as a]",
             "ProjectionExec: expr=[c@2 as c1]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }], 10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
 
@@ -1457,11 +1457,11 @@ mod tests {
         let expected = &[
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a1\", index: 0 }, Column { name: \"a2\", index: 0 })]",
             "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 0 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 0 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
@@ -1506,11 +1506,11 @@ mod tests {
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"b1\", index: 1 }, Column { name: \"b\", index: 0 }), (Column { name: \"a1\", index: 0 }, Column { name: \"a\", index: 1 })]",
             "ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
             "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
@@ -1613,15 +1613,15 @@ mod tests {
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"B\", index: 2 }, Column { name: \"b1\", index: 6 }), (Column { name: \"C\", index: 3 }, Column { name: \"c\", index: 2 }), (Column { name: \"AA\", index: 1 }, Column { name: \"a1\", index: 5 })]",
             "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
@@ -1733,15 +1733,15 @@ mod tests {
                 top_join_plan.as_str(),
                 "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
                 "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 })]",
-                "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }], 10)",
+                "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 }], 10), input_partitions=1",
                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-                "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }], 10)",
+                "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 }], 10), input_partitions=1",
                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                 "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
-                "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10)",
+                "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-                "RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10)",
+                "RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
             ];
@@ -1852,15 +1852,15 @@ mod tests {
                 top_join_plan.as_str(),
                 "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
                 "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 })]",
-                "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }], 10)",
+                "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }, Column { name: \"b\", index: 1 }], 10), input_partitions=1",
                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-                "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }], 10)",
+                "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 0 }, Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                 "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 })]",
-                "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10)",
+                "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 }], 10), input_partitions=1",
                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
-                "RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10)",
+                "RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 }], 10), input_partitions=1",
                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
                 "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
             ];
@@ -1926,31 +1926,31 @@ mod tests {
                         top_join_plan.as_str(),
                         join_plan.as_str(),
                         "SortExec: [a@0 ASC]",
-                        "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+                        "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
                         "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         "SortExec: [b1@1 ASC]",
-                        "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+                        "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                         "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
                         "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         "SortExec: [c@2 ASC]",
-                        "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+                        "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
                         "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                     ],
                 // Should include 4 RepartitionExecs
                 _ => vec![
                         top_join_plan.as_str(),
                         "SortExec: [a@0 ASC]",
-                        "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+                        "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=10",
                         join_plan.as_str(),
                         "SortExec: [a@0 ASC]",
-                        "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+                        "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
                         "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         "SortExec: [b1@1 ASC]",
-                        "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+                        "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                         "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
                         "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         "SortExec: [c@2 ASC]",
-                        "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+                        "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
                         "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                 ],
             };
@@ -1979,31 +1979,31 @@ mod tests {
                             top_join_plan.as_str(),
                             join_plan.as_str(),
                             "SortExec: [a@0 ASC]",
-                            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
                             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             "SortExec: [b1@1 ASC]",
-                            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
                             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             "SortExec: [c@2 ASC]",
-                            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
                             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         ],
                         // Should include 4 RepartitionExecs and 4 SortExecs
                         _ => vec![
                             top_join_plan.as_str(),
                             "SortExec: [b1@6 ASC]",
-                            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 6 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 6 }], 10), input_partitions=10",
                             join_plan.as_str(),
                             "SortExec: [a@0 ASC]",
-                            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 0 }], 10), input_partitions=1",
                             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             "SortExec: [b1@1 ASC]",
-                            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 1 }], 10), input_partitions=1",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
                             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                             "SortExec: [c@2 ASC]",
-                            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10)",
+                            "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 2 }], 10), input_partitions=1",
                             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
                         ],
                     };
@@ -2069,13 +2069,13 @@ mod tests {
             "ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
             "ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
             "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 0 }, Column { name: \"a1\", index: 1 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
             "SortExec: [b2@1 ASC,a2@0 ASC]",
             "ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
             "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
-            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10)",
+            "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 0 }, Column { name: \"a\", index: 1 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[a, b, c, d, e]",
         ];
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 81f6b2eda..8595aa8ab 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -495,7 +495,7 @@ mod tests {
             "AggregateExec: mode=Final, gby=[], aggr=[]",
             "CoalescePartitionsExec",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
         ];
 
@@ -512,7 +512,7 @@ mod tests {
             "CoalescePartitionsExec",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "FilterExec: c1@0",
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
         ];
 
@@ -530,7 +530,7 @@ mod tests {
             "LocalLimitExec: fetch=100",
             "FilterExec: c1@0",
             // nothing sorts the data, so the local limit doesn't require sorted data either
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
         ];
 
@@ -548,7 +548,7 @@ mod tests {
             "LocalLimitExec: fetch=100",
             "FilterExec: c1@0",
             // nothing sorts the data, so the local limit doesn't require sorted data either
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
         ];
 
@@ -598,13 +598,13 @@ mod tests {
             "AggregateExec: mode=Final, gby=[], aggr=[]",
             "CoalescePartitionsExec",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "GlobalLimitExec: skip=0, fetch=100",
             "CoalescePartitionsExec",
             "LocalLimitExec: fetch=100",
             "FilterExec: c1@0",
             // repartition should happen prior to the filter to maximize parallelism
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "GlobalLimitExec: skip=0, fetch=100",
             "LocalLimitExec: fetch=100",
             // Expect no repartition to happen for local limit
@@ -625,13 +625,13 @@ mod tests {
             "AggregateExec: mode=Final, gby=[], aggr=[]",
             "CoalescePartitionsExec",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "GlobalLimitExec: skip=5, fetch=100",
             "CoalescePartitionsExec",
             "LocalLimitExec: fetch=100",
             "FilterExec: c1@0",
             // repartition should happen prior to the filter to maximize parallelism
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "GlobalLimitExec: skip=0, fetch=100",
             "LocalLimitExec: fetch=100",
             // Expect no repartition to happen for local limit
@@ -671,7 +671,7 @@ mod tests {
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
             "SortExec: [c1@0 ASC]",
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
         ];
 
@@ -754,7 +754,7 @@ mod tests {
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
             // union input 2: should repartition
             "FilterExec: c1@0",
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
         ];
 
@@ -772,7 +772,7 @@ mod tests {
             "SortPreservingMergeExec: [c1@0 ASC]",
             "SortExec: [c1@0 ASC]",
             "ProjectionExec: expr=[c1@0 as c1]",
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
         ];
 
@@ -806,7 +806,7 @@ mod tests {
             // Expect repartition on the input to the sort (as it can benefit from additional parallelism)
             "SortExec: [c1@0 ASC]",
             "ProjectionExec: expr=[c1@0 as c1]",
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
         ];
 
@@ -824,7 +824,7 @@ mod tests {
             // Expect repartition on the input to the sort (as it can benefit from additional parallelism)
             "SortExec: [c1@0 ASC]",
             "FilterExec: c1@0",
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
         ];
 
@@ -846,7 +846,7 @@ mod tests {
             "ProjectionExec: expr=[c1@0 as c1]",
             "FilterExec: c1@0",
             // repartition is lowest down
-            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
         ];
 
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 0417814f3..fc6b4066c 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -322,7 +322,7 @@ pub fn with_new_children_if_necessary(
 ///   assert_eq!("ProjectionExec: expr=[a@0 as a]\
 ///              \n  CoalesceBatchesExec: target_batch_size=8192\
 ///              \n    FilterExec: a@0 < 5\
-///              \n      RepartitionExec: partitioning=RoundRobinBatch(3)\
+///              \n      RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1\
 ///              \n        CsvExec: files={1 group: [[WORKING_DIR/tests/data/example.csv]]}, has_header=true, limit=None, projection=[a]",
 ///               plan_string.trim());
 ///
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs
index 1d0f1fe5c..8b9d44a99 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -445,7 +445,12 @@ impl ExecutionPlan for RepartitionExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default => {
-                write!(f, "RepartitionExec: partitioning={:?}", self.partitioning)
+                write!(
+                    f,
+                    "RepartitionExec: partitioning={:?}, input_partitions={}",
+                    self.partitioning,
+                    self.input.output_partitioning().partition_count()
+                )
             }
         }
     }
diff --git a/datafusion/core/tests/sql/avro.rs b/datafusion/core/tests/sql/avro.rs
index 202a175b2..58b318abd 100644
--- a/datafusion/core/tests/sql/avro.rs
+++ b/datafusion/core/tests/sql/avro.rs
@@ -150,7 +150,7 @@ async fn avro_explain() {
             \n  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
             \n    CoalescePartitionsExec\
             \n      AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
-            \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
+            \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
             \n          AvroExec: files={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, limit=None\
             \n",
         ],
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index 1e4f081f4..1644ca4e7 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -612,11 +612,11 @@ async fn test_physical_plan_display_indent() {
         "      ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
         "        AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
         "          CoalesceBatchesExec: target_batch_size=4096",
-        "            RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)",
+        "            RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000), input_partitions=9000",
         "              AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
         "                CoalesceBatchesExec: target_batch_size=4096",
         "                  FilterExec: c12@1 < 10",
-        "                    RepartitionExec: partitioning=RoundRobinBatch(9000)",
+        "                    RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
         "                      CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c12]",
     ];
 
@@ -656,14 +656,14 @@ async fn test_physical_plan_display_indent_multi_children() {
         "  CoalesceBatchesExec: target_batch_size=4096",
         "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 0 })]",
         "      CoalesceBatchesExec: target_batch_size=4096",
-        "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000)",
+        "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000), input_partitions=9000",
         "          ProjectionExec: expr=[c1@0 as c1]",
-        "            RepartitionExec: partitioning=RoundRobinBatch(9000)",
+        "            RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
         "              CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]",
         "      CoalesceBatchesExec: target_batch_size=4096",
-        "        RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000)",
+        "        RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000), input_partitions=9000",
         "          ProjectionExec: expr=[c1@0 as c2]",
-        "            RepartitionExec: partitioning=RoundRobinBatch(9000)",
+        "            RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
         "              CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]",
     ];
 
@@ -705,7 +705,7 @@ async fn csv_explain() {
             "ProjectionExec: expr=[c1@0 as c1]\
               \n  CoalesceBatchesExec: target_batch_size=4096\
               \n    FilterExec: c2@1 > 10\
-              \n      RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
+              \n      RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
               \n        CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2]\
               \n",
         ]];
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index c73a3ad20..ac02a0d57 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -1883,13 +1883,13 @@ async fn sort_merge_join_on_date32() -> Result<()> {
         "  SortMergeJoin: join_type=Inner, on=[(Column { name: \"c1\", index: 0 }, Column { name: \"c1\", index: 0 })]",
         "    SortExec: [c1@0 ASC]",
         "      CoalesceBatchesExec: target_batch_size=4096",
-        "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2)",
-        "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+        "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+        "          RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
         "            MemoryExec: partitions=1, partition_sizes=[1]",
         "    SortExec: [c1@0 ASC]",
         "      CoalesceBatchesExec: target_batch_size=4096",
-        "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2)",
-        "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+        "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
+        "          RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
         "            MemoryExec: partitions=1, partition_sizes=[1]",
     ];
     let formatted = displayable(physical_plan.as_ref()).indent().to_string();
@@ -1929,14 +1929,14 @@ async fn sort_merge_join_on_decimal() -> Result<()> {
         "    SortMergeJoin: join_type=Right, on=[(Column { name: \"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }, Column { name: \"c3\", index: 2 })]",
         "      SortExec: [CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]",
         "        CoalesceBatchesExec: target_batch_size=4096",
-        "          RepartitionExec: partitioning=Hash([Column { name: \"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }], 2)",
+        "          RepartitionExec: partitioning=Hash([Column { name: \"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }], 2), input_partitions=2",
         "            ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))]",
-        "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+        "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
         "                MemoryExec: partitions=1, partition_sizes=[1]",
         "      SortExec: [c3@2 ASC]",
         "        CoalesceBatchesExec: target_batch_size=4096",
-        "          RepartitionExec: partitioning=Hash([Column { name: \"c3\", index: 2 }], 2)",
-        "            RepartitionExec: partitioning=RoundRobinBatch(2)",
+        "          RepartitionExec: partitioning=Hash([Column { name: \"c3\", index: 2 }], 2), input_partitions=2",
+        "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
         "              MemoryExec: partitions=1, partition_sizes=[1]",
     ];
     let formatted = displayable(physical_plan.as_ref()).indent().to_string();
@@ -1986,13 +1986,13 @@ async fn left_semi_join() -> Result<()> {
                 "      CoalesceBatchesExec: target_batch_size=4096",
                 "        HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
                 "          CoalesceBatchesExec: target_batch_size=4096",
-                "            RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
-                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "            RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
                 "          CoalesceBatchesExec: target_batch_size=4096",
-                "            RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
+                "            RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
                 "              ProjectionExec: expr=[t2_id@0 as t2_id]",
-                "                RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "                RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "                  MemoryExec: partitions=1, partition_sizes=[1]",
             ]
         } else {
@@ -2004,7 +2004,7 @@ async fn left_semi_join() -> Result<()> {
                 "        HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
                 "          MemoryExec: partitions=1, partition_sizes=[1]",
                 "          ProjectionExec: expr=[t2_id@0 as t2_id]",
-                "            RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "              MemoryExec: partitions=1, partition_sizes=[1]",
             ]
         };
@@ -2068,12 +2068,12 @@ async fn left_semi_join() -> Result<()> {
                 "      CoalesceBatchesExec: target_batch_size=4096",
                 "        HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
                 "          CoalesceBatchesExec: target_batch_size=4096",
-                "            RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
-                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "            RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
                 "          CoalesceBatchesExec: target_batch_size=4096",
-                "            RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
-                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "            RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
             ]
         } else {
@@ -2081,7 +2081,7 @@ async fn left_semi_join() -> Result<()> {
                 "SortExec: [t1_id@0 ASC NULLS LAST]",
                 "  CoalescePartitionsExec",
                 "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name]",
-                "      RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "      RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "        CoalesceBatchesExec: target_batch_size=4096",
                 "          HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
                 "            MemoryExec: partitions=1, partition_sizes=[1]",
@@ -2237,12 +2237,12 @@ async fn right_semi_join() -> Result<()> {
                   "      CoalesceBatchesExec: target_batch_size=4096",
                   "        HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
                   "          CoalesceBatchesExec: target_batch_size=4096",
-                  "            RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
-                  "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                  "            RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
+                  "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                   "                MemoryExec: partitions=1, partition_sizes=[1]",
                   "          CoalesceBatchesExec: target_batch_size=4096",
-                  "            RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
-                  "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                  "            RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
+                  "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                   "                MemoryExec: partitions=1, partition_sizes=[1]",
             ]
         } else {
@@ -2250,7 +2250,7 @@ async fn right_semi_join() -> Result<()> {
                 "SortExec: [t1_id@0 ASC NULLS LAST]",
                 "  CoalescePartitionsExec",
                 "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
-                "      RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "      RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "        CoalesceBatchesExec: target_batch_size=4096",
                 "          HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
                 "            MemoryExec: partitions=1, partition_sizes=[1]",
@@ -2285,12 +2285,12 @@ async fn right_semi_join() -> Result<()> {
                   "      CoalesceBatchesExec: target_batch_size=4096",
                   "        HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
                   "          CoalesceBatchesExec: target_batch_size=4096",
-                  "            RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
-                  "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                  "            RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
+                  "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                   "                MemoryExec: partitions=1, partition_sizes=[1]",
                   "          CoalesceBatchesExec: target_batch_size=4096",
-                  "            RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
-                  "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                  "            RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
+                  "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                   "                MemoryExec: partitions=1, partition_sizes=[1]",
             ]
         } else {
@@ -2298,7 +2298,7 @@ async fn right_semi_join() -> Result<()> {
                 "SortExec: [t1_id@0 ASC NULLS LAST]",
                 "  CoalescePartitionsExec",
                 "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
-                "      RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "      RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "        CoalesceBatchesExec: target_batch_size=4096",
                 "          HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
                 "            MemoryExec: partitions=1, partition_sizes=[1]",
@@ -2474,14 +2474,14 @@ async fn reduce_cross_join_with_wildcard_and_expr() -> Result<()> {
                 "    CoalesceBatchesExec: target_batch_size=4096",
                 "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 3 }, Column { name: \"CAST(t2.t2_id AS Int64)\", index: 3 })]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + Int64(11)\", index: 3 }], 2)",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + Int64(11)\", index: 3 }], 2), input_partitions=2",
                 "            ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as t1.t1_id + Int64(11)]",
-                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"CAST(t2.t2_id AS Int64)\", index: 3 }], 2)",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"CAST(t2.t2_id AS Int64)\", index: 3 }], 2), input_partitions=2",
                 "            ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(t2.t2_id AS Int64)]",
-                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
            ]
         } else {
@@ -2492,10 +2492,10 @@ async fn reduce_cross_join_with_wildcard_and_expr() -> Result<()> {
                 "      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 3 }, Column { name: \"CAST(t2.t2_id AS Int64)\", index: 3 })]",
                 "        CoalescePartitionsExec",
                 "          ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as t1.t1_id + Int64(11)]",
-                "            RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "              MemoryExec: partitions=1, partition_sizes=[1]",
                 "        ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(t2.t2_id AS Int64)]",
-                "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "          RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "            MemoryExec: partitions=1, partition_sizes=[1]",
             ]
         };
@@ -2546,14 +2546,14 @@ async fn both_side_expr_key_inner_join() -> Result<()> {
                 "    CoalesceBatchesExec: target_batch_size=4096",
                 "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }, Column { name: \"t2.t2_id + UInt32(1)\", index: 1 })]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }], 2)",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }], 2), input_partitions=2",
                 "            ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as t1.t1_id + UInt32(12)]",
-                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id + UInt32(1)\", index: 1 }], 2)",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id + UInt32(1)\", index: 1 }], 2), input_partitions=2",
                 "            ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as t2.t2_id + UInt32(1)]",
-                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
             ]
         } else {
@@ -2564,10 +2564,10 @@ async fn both_side_expr_key_inner_join() -> Result<()> {
                 "      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }, Column { name: \"t2.t2_id + UInt32(1)\", index: 1 })]",
                 "        CoalescePartitionsExec",
                 "          ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as t1.t1_id + UInt32(12)]",
-                "            RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "              MemoryExec: partitions=1, partition_sizes=[1]",
                 "        ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as t2.t2_id + UInt32(1)]",
-                "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "          RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "            MemoryExec: partitions=1, partition_sizes=[1]",
             ]
         };
@@ -2617,25 +2617,25 @@ async fn left_side_expr_key_inner_join() -> Result<()> {
                 "    CoalesceBatchesExec: target_batch_size=4096",
                 "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }], 2)",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }], 2), input_partitions=2",
                 "            ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as t1.t1_id + UInt32(11)]",
-                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2)",
-                "            RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
+                "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "              MemoryExec: partitions=1, partition_sizes=[1]",
            ]
         } else {
             vec![
                 "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
-                "    RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "    RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "      CoalesceBatchesExec: target_batch_size=4096",
                 "        HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
                 "          CoalescePartitionsExec",
                 "            ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as t1.t1_id + UInt32(11)]",
-                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
                 "          MemoryExec: partitions=1, partition_sizes=[1]",
             ]
@@ -2686,13 +2686,13 @@ async fn right_side_expr_key_inner_join() -> Result<()> {
                 "    CoalesceBatchesExec: target_batch_size=4096",
                 "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 1 })]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
-                "            RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
+                "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "              MemoryExec: partitions=1, partition_sizes=[1]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 1 }], 2)",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 1 }], 2), input_partitions=2",
                 "            ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
-                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
            ]
         } else {
@@ -2703,7 +2703,7 @@ async fn right_side_expr_key_inner_join() -> Result<()> {
                 "      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 1 })]",
                 "        MemoryExec: partitions=1, partition_sizes=[1]",
                 "        ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
-                "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "          RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "            MemoryExec: partitions=1, partition_sizes=[1]",
             ]
         };
@@ -2753,13 +2753,13 @@ async fn select_wildcard_with_expr_key_inner_join() -> Result<()> {
                 "    CoalesceBatchesExec: target_batch_size=4096",
                 "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 3 })]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
-                "            RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
+                "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "              MemoryExec: partitions=1, partition_sizes=[1]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 3 }], 2)",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 3 }], 2), input_partitions=2",
                 "            ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
-                "              RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
            ]
         } else {
@@ -2770,7 +2770,7 @@ async fn select_wildcard_with_expr_key_inner_join() -> Result<()> {
                 "      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 3 })]",
                 "        MemoryExec: partitions=1, partition_sizes=[1]",
                 "        ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
-                "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "          RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                 "            MemoryExec: partitions=1, partition_sizes=[1]",
             ]
         };
diff --git a/datafusion/core/tests/sql/json.rs b/datafusion/core/tests/sql/json.rs
index b6b847382..26ddff61d 100644
--- a/datafusion/core/tests/sql/json.rs
+++ b/datafusion/core/tests/sql/json.rs
@@ -93,7 +93,7 @@ async fn json_explain() {
             \n  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
             \n    CoalescePartitionsExec\
             \n      AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\
-            \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
+            \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
             \n          JsonExec: limit=None, files={1 group: [[WORKING_DIR/tests/jsons/2.json]]}\n",
         ],
     ];
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index 1167d57a4..24d86d527 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -1682,7 +1682,7 @@ async fn test_window_agg_sort() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum2]",
-            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
             "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
             "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
             "        SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
@@ -1714,7 +1714,7 @@ async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MI [...]
-            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
             "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
             "      WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
             "        WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
@@ -1749,7 +1749,7 @@ async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()>
             "SortExec: [c2@0 ASC NULLS LAST]",
             "  CoalescePartitionsExec",
             "    ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN U [...]
-            "      RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "      RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
             "        WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
             "          WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
             "            SortExec: [c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]",
@@ -1792,12 +1792,12 @@ async fn test_window_partition_by_order_by() -> Result<()> {
             "  BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
             "    SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
             "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2)",
+            "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
             "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
             "            SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
             "              CoalesceBatchesExec: target_batch_size=4096",
-            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2)",
-            "                  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2), input_partitions=2",
+            "                  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
         ]
     };
 
@@ -1830,7 +1830,7 @@ async fn test_window_agg_sort_reversed_plan() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
-            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
             "        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
@@ -1886,7 +1886,7 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@0 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as lag1, LAG(aggregate_tes [...]
-            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_ [...]
             "        BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dic [...]
@@ -1938,7 +1938,7 @@ async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]",
-            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
             "        SortExec: [c9@0 ASC NULLS LAST]",
@@ -1992,7 +1992,7 @@ async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWE [...]
-            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
             "        SortExec: [c9@2 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
@@ -2080,7 +2080,7 @@ async fn test_window_agg_complex_plan() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@12 as d, SUM(nul [...]
-            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Prec [...]
             "        WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
@@ -2129,7 +2129,7 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()>
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
-            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
             "        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
@@ -2184,7 +2184,7 @@ async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2]",
-            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
             "        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
@@ -2238,7 +2238,7 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum2]",
-            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_o [...]
             "        WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED [...]
@@ -2297,15 +2297,15 @@ async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> {
             "  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]",
             "    CoalescePartitionsExec",
             "      AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]",
-            "        RepartitionExec: partitioning=RoundRobinBatch(8)",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
             "          CoalescePartitionsExec",
             "            AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]",
             "              CoalesceBatchesExec: target_batch_size=4096",
-            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8)",
+            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8",
             "                  AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]",
             "                    CoalesceBatchesExec: target_batch_size=4096",
             "                      FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
-            "                        RepartitionExec: partitioning=RoundRobinBatch(8)",
+            "                        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
         ]
     };
 
@@ -2351,7 +2351,7 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     let expected = {
         vec![
             "ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]",
-            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
             "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
             "        WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",