You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/27 21:50:22 UTC

[arrow-datafusion] branch master updated: Reorder the physical plan optimizer rules, extract `GlobalSortSelection`, make `Repartition` optional (#4714)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 899c86a0c Reorder the physical plan optimizer rules, extract `GlobalSortSelection`, make `Repartition` optional (#4714)
899c86a0c is described below

commit 899c86a0c62f0c3f6324d3125158ec643b524515
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Wed Dec 28 05:50:17 2022 +0800

    Reorder the physical plan optimizer rules, extract `GlobalSortSelection`, make `Repartition` optional (#4714)
    
    * Extract the global sort algorithm selection from the BasicEnforcement to be a separate rule, GlobalSortSelection
    
    * Make the optimizer rule of Repartition optional
    
    * Fix EmptyExec data() method
    
    * Reorder the physical plan optimizer rules
    
    * Refine the UT for the repartition rule by adding the essential BasicEnforcement rule
    
    * Fix UT failure for window function
    
    * Fix example testing
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 datafusion-examples/examples/custom_datasource.rs  |   2 +-
 datafusion/core/src/config.rs                      |   9 ++
 datafusion/core/src/execution/context.rs           |  58 ++++++---
 .../src/physical_optimizer/coalesce_batches.rs     |  13 +-
 .../core/src/physical_optimizer/enforcement.rs     |  33 +----
 .../physical_optimizer/global_sort_selection.rs    |  89 +++++++++++++
 datafusion/core/src/physical_optimizer/mod.rs      |   1 +
 .../core/src/physical_optimizer/repartition.rs     | 101 +++++++++------
 datafusion/core/src/physical_plan/empty.rs         |  26 +++-
 datafusion/core/src/physical_plan/planner.rs       |   5 +
 datafusion/core/tests/sql/joins.rs                 |  10 +-
 datafusion/core/tests/sql/timestamp.rs             |  24 ++--
 datafusion/core/tests/sql/union.rs                 |   4 +-
 datafusion/core/tests/sql/window.rs                | 144 ++++++++++++---------
 .../test_files/information_schema.slt              |   1 +
 docs/source/user-guide/configs.md                  |   1 +
 16 files changed, 344 insertions(+), 177 deletions(-)

diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs
index 68e8f5a54..c426d9611 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -264,6 +264,6 @@ impl ExecutionPlan for CustomExec {
     }
 
     fn statistics(&self) -> Statistics {
-        todo!()
+        Statistics::default()
     }
 }
diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index 4dabb53d2..976b8c073 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -131,6 +131,10 @@ pub const OPT_PREFER_HASH_JOIN: &str = "datafusion.optimizer.prefer_hash_join";
 pub const OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str =
     "datafusion.optimizer.hash_join_single_partition_threshold";
 
+/// Configuration option "datafusion.execution.round_robin_repartition"
+pub const OPT_ENABLE_ROUND_ROBIN_REPARTITION: &str =
+    "datafusion.optimizer.enable_round_robin_repartition";
+
 /// Definition of a configuration option
 pub struct ConfigDefinition {
     /// key used to identifier this configuration option
@@ -409,6 +413,11 @@ impl BuiltInConfigs {
                  "The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition",
                  1024 * 1024,
              ),
+             ConfigDefinition::new_bool(
+                 OPT_ENABLE_ROUND_ROBIN_REPARTITION,
+                 "When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores",
+                 true,
+             ),
             ]
         }
     }
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 9c909d5d6..db6ab99fb 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -73,7 +73,8 @@ use crate::physical_optimizer::repartition::Repartition;
 
 use crate::config::{
     ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
-    OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
+    OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS,
+    OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
 };
 use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
 use crate::physical_optimizer::enforcement::BasicEnforcement;
@@ -98,6 +99,7 @@ use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_
 use crate::catalog::listing_schema::ListingSchemaProvider;
 use crate::datasource::object_store::ObjectStoreUrl;
 use crate::execution::memory_pool::MemoryPool;
+use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
 use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
 use uuid::Uuid;
 
@@ -1530,11 +1532,47 @@ impl SessionState {
             );
         }
 
-        let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
-            Arc::new(AggregateStatistics::new()),
-            Arc::new(JoinSelection::new()),
-        ];
+        // We need to take care of the rule ordering. They may influence each other.
+        let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> =
+            vec![Arc::new(AggregateStatistics::new())];
+        // - In order to increase the parallelism, it will change the output partitioning
+        // of some operators in the plan tree, which will influence other rules.
+        // Therefore, it should be run as soon as possible.
+        // - The reason to make it optional is
+        //      - it's not used for the distributed engine, Ballista.
+        //      - it's conflicted with some parts of the BasicEnforcement, since it will
+        //      introduce additional repartitioning while the BasicEnforcement aims at
+        //      reducing unnecessary repartitioning.
+        if config
+            .config_options
+            .get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
+            .unwrap_or_default()
+        {
+            physical_optimizers.push(Arc::new(Repartition::new()));
+        }
+        //- Currently it will depend on the partition number to decide whether to change the
+        // single node sort to parallel local sort and merge. Therefore, it should be run
+        // after the Repartition.
+        // - Since it will change the output ordering of some operators, it should be run
+        // before JoinSelection and BasicEnforcement, which may depend on that.
+        physical_optimizers.push(Arc::new(GlobalSortSelection::new()));
+        // Statistics-base join selection will change the Auto mode to real join implementation,
+        // like collect left, or hash join, or future sort merge join, which will
+        // influence the BasicEnforcement to decide whether to add additional repartition
+        // and local sort to meet the distribution and ordering requirements.
+        // Therefore, it should be run before BasicEnforcement
+        physical_optimizers.push(Arc::new(JoinSelection::new()));
+        // It's for adding essential repartition and local sorting operator to satisfy the
+        // required distribution and local sort.
+        // Please make sure that the whole plan tree is determined.
         physical_optimizers.push(Arc::new(BasicEnforcement::new()));
+        // `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
+        // However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
+        // These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
+        // rule below performs this analysis and removes unnecessary `SortExec`s.
+        physical_optimizers.push(Arc::new(OptimizeSorts::new()));
+        // It will not influence the distribution and ordering of the whole plan tree.
+        // Therefore, to avoid influencing other rules, it should be run at last.
         if config
             .config_options
             .get_bool(OPT_COALESCE_BATCHES)
@@ -1549,16 +1587,6 @@ impl SessionState {
                     .unwrap(),
             )));
         }
-        physical_optimizers.push(Arc::new(Repartition::new()));
-        // Repartition rule could introduce additional RepartitionExec with RoundRobin partitioning.
-        // To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.
-        physical_optimizers.push(Arc::new(BasicEnforcement::new()));
-
-        // `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
-        // However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
-        // These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
-        // rule below performs this analysis and removes unnecessary `SortExec`s.
-        physical_optimizers.push(Arc::new(OptimizeSorts::new()));
 
         SessionState {
             session_id,
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 941c5c141..e0d20be16 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -23,7 +23,7 @@ use crate::{
     physical_optimizer::PhysicalOptimizerRule,
     physical_plan::{
         coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
-        repartition::RepartitionExec, rewrite::TreeNodeRewritable,
+        repartition::RepartitionExec, rewrite::TreeNodeRewritable, Partitioning,
     },
 };
 use std::sync::Arc;
@@ -57,7 +57,16 @@ impl PhysicalOptimizerRule for CoalesceBatches {
             // See https://github.com/apache/arrow-datafusion/issues/139
             let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
                 || plan_any.downcast_ref::<HashJoinExec>().is_some()
-                || plan_any.downcast_ref::<RepartitionExec>().is_some();
+                // Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
+                || plan_any
+                    .downcast_ref::<RepartitionExec>()
+                    .map(|repart_exec| {
+                        !matches!(
+                            repart_exec.partitioning().clone(),
+                            Partitioning::RoundRobinBatch(_)
+                        )
+                    })
+                    .unwrap_or(false);
             if wrap_in_coalesce {
                 Ok(Some(Arc::new(CoalesceBatchesExec::new(
                     plan.clone(),
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 4a496a3ef..30a796191 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -30,8 +30,7 @@ use crate::physical_plan::joins::{
 use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::rewrite::TreeNodeRewritable;
-use crate::physical_plan::sorts::sort::{SortExec, SortOptions};
-use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::sorts::sort::SortOptions;
 use crate::physical_plan::windows::WindowAggExec;
 use crate::physical_plan::Partitioning;
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
@@ -844,36 +843,6 @@ fn ensure_distribution_and_ordering(
     if plan.children().is_empty() {
         return Ok(plan);
     }
-    // It's mainly for changing the single node global SortExec to
-    // the SortPreservingMergeExec with multiple local SortExec.
-    // What's more, if limit exists, it can also be pushed down to the local sort
-    let plan = plan
-        .as_any()
-        .downcast_ref::<SortExec>()
-        .and_then(|sort_exec| {
-            // There are three situations that there's no need for this optimization
-            // - There's only one input partition;
-            // - It's already preserving the partitioning so that it can be regarded as a local sort
-            // - There's no limit pushed down to the local sort (It's still controversial)
-            if sort_exec.input().output_partitioning().partition_count() > 1
-                && !sort_exec.preserve_partitioning()
-                && sort_exec.fetch().is_some()
-            {
-                let sort = SortExec::new_with_partitioning(
-                    sort_exec.expr().to_vec(),
-                    sort_exec.input().clone(),
-                    true,
-                    sort_exec.fetch(),
-                );
-                Some(Arc::new(SortPreservingMergeExec::new(
-                    sort_exec.expr().to_vec(),
-                    Arc::new(sort),
-                )))
-            } else {
-                None
-            }
-        })
-        .map_or(plan, |new_plan| new_plan);
 
     let required_input_distributions = plan.required_input_distribution();
     let required_input_orderings = plan.required_input_ordering();
diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
new file mode 100644
index 000000000..a6bb8229c
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Select the efficient global sort implementation based on sort details.
+
+use std::sync::Arc;
+
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::ExecutionPlan;
+use crate::prelude::SessionConfig;
+
+/// Currently for a sort operator, if
+/// - there are more than one input partitions
+/// - and there's some limit which can be pushed down to each of its input partitions
+/// then [SortPreservingMergeExec] with local sort with a limit pushed down will be preferred;
+/// Otherwise, the normal global sort [SortExec] will be used.
+/// Later more intelligent statistics-based decision can also be introduced.
+/// For example, for a small data set, the global sort may be efficient enough
+#[derive(Default)]
+pub struct GlobalSortSelection {}
+
+impl GlobalSortSelection {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for GlobalSortSelection {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &SessionConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        plan.transform_up(&|plan| {
+            Ok(plan
+                .as_any()
+                .downcast_ref::<SortExec>()
+                .and_then(|sort_exec| {
+                    if sort_exec.input().output_partitioning().partition_count() > 1
+                        && sort_exec.fetch().is_some()
+                        // It's already preserving the partitioning so that it can be regarded as a local sort
+                        && !sort_exec.preserve_partitioning()
+                    {
+                        let sort = SortExec::new_with_partitioning(
+                            sort_exec.expr().to_vec(),
+                            sort_exec.input().clone(),
+                            true,
+                            sort_exec.fetch(),
+                        );
+                        let global_sort: Arc<dyn ExecutionPlan> =
+                            Arc::new(SortPreservingMergeExec::new(
+                                sort_exec.expr().to_vec(),
+                                Arc::new(sort),
+                            ));
+                        Some(global_sort)
+                    } else {
+                        None
+                    }
+                }))
+        })
+    }
+
+    fn name(&self) -> &str {
+        "global_sort_selection"
+    }
+
+    fn schema_check(&self) -> bool {
+        false
+    }
+}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs
index 0fd0600fb..86ec6f846 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -21,6 +21,7 @@
 pub mod aggregate_statistics;
 pub mod coalesce_batches;
 pub mod enforcement;
+pub mod global_sort_selection;
 pub mod join_selection;
 pub mod optimize_sorts;
 pub mod optimizer;
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 7bdff91ec..2d3f7a0e1 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -143,7 +143,7 @@ impl Repartition {
 /// is an intervening node that does not `maintain_input_order`
 ///
 /// if `can_reorder` is false, means that the output of this node
-/// can not be reordered as as something upstream is relying on that order
+/// can not be reordered as as the final output is relying on that order
 ///
 /// If 'would_benefit` is false, the upstream operator doesn't
 ///  benefit from additional repartition
@@ -161,26 +161,6 @@ fn optimize_partitions(
         // leaf node - don't replace children
         plan
     } else {
-        let can_reorder_children =
-            match (plan.relies_on_input_order(), plan.maintains_input_order()) {
-                (true, _) => {
-                    // `plan` itself relies on the order of its
-                    // children, so don't reorder them!
-                    false
-                }
-                (false, false) => {
-                    // `plan` may reorder the input itself, so no need
-                    // to preserve the order of any children
-                    true
-                }
-                (false, true) => {
-                    // `plan` will maintain the order, so we can only
-                    // repartition children if it is ok to reorder the
-                    // output of this node
-                    can_reorder
-                }
-            };
-
         let children = plan
             .children()
             .iter()
@@ -188,7 +168,7 @@ fn optimize_partitions(
                 optimize_partitions(
                     target_partitions,
                     child.clone(),
-                    can_reorder_children,
+                    can_reorder || child.output_ordering().is_none(),
                     plan.benefits_from_input_partitioning(),
                 )
             })
@@ -197,7 +177,7 @@ fn optimize_partitions(
     };
 
     // decide if we should bother trying to repartition the output of this plan
-    let could_repartition = match new_plan.output_partitioning() {
+    let mut could_repartition = match new_plan.output_partitioning() {
         // Apply when underlying node has less than `self.target_partitions` amount of concurrency
         RoundRobinBatch(x) => x < target_partitions,
         UnknownPartitioning(x) => x < target_partitions,
@@ -206,6 +186,13 @@ fn optimize_partitions(
         Hash(_, _) => false,
     };
 
+    // Don't need to apply when the returned row count is not greater than 1
+    let stats = new_plan.statistics();
+    if stats.is_exact {
+        could_repartition = could_repartition
+            && stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true);
+    }
+
     if would_benefit && could_repartition && can_reorder {
         Ok(Arc::new(RepartitionExec::try_new(
             new_plan,
@@ -226,7 +213,12 @@ impl PhysicalOptimizerRule for Repartition {
         if config.target_partitions() == 1 {
             Ok(plan)
         } else {
-            optimize_partitions(config.target_partitions(), plan, false, false)
+            optimize_partitions(
+                config.target_partitions(),
+                plan.clone(),
+                plan.output_ordering().is_none(),
+                false,
+            )
         }
     }
 
@@ -246,6 +238,7 @@ mod tests {
     use super::*;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
+    use crate::physical_optimizer::enforcement::BasicEnforcement;
     use crate::physical_plan::aggregates::{
         AggregateExec, AggregateMode, PhysicalGroupBy,
     };
@@ -295,12 +288,20 @@ mod tests {
         Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap())
     }
 
-    fn sort_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+    fn sort_exec(
+        input: Arc<dyn ExecutionPlan>,
+        preserve_partitioning: bool,
+    ) -> Arc<dyn ExecutionPlan> {
         let sort_exprs = vec![PhysicalSortExpr {
             expr: col("c1", &schema()).unwrap(),
             options: SortOptions::default(),
         }];
-        Arc::new(SortExec::try_new(sort_exprs, input, None).unwrap())
+        Arc::new(SortExec::new_with_partitioning(
+            sort_exprs,
+            input,
+            preserve_partitioning,
+            None,
+        ))
     }
 
     fn projection_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
@@ -360,9 +361,16 @@ mod tests {
             let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
 
             // run optimizer
-            let optimizer = Repartition {};
-            let optimized = optimizer
-                .optimize($PLAN, &SessionConfig::new().with_target_partitions(10))?;
+            let config = SessionConfig::new().with_target_partitions(10);
+            let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
+                Arc::new(Repartition::new()),
+                // The `BasicEnforcement` is an essential rule to be applied.
+                // Otherwise, the correctness of the generated optimized plan cannot be guaranteed
+                Arc::new(BasicEnforcement::new()),
+            ];
+            let optimized = optimizers.into_iter().fold($PLAN, |plan, optimizer| {
+                optimizer.optimize(plan, &config).unwrap()
+            });
 
             // Now format correctly
             let plan = displayable(optimized.as_ref()).indent().to_string();
@@ -382,6 +390,7 @@ mod tests {
 
         let expected = [
             "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "CoalescePartitionsExec",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
@@ -397,6 +406,7 @@ mod tests {
 
         let expected = &[
             "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "CoalescePartitionsExec",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "FilterExec: c1@0",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
@@ -413,6 +423,7 @@ mod tests {
 
         let expected = &[
             "GlobalLimitExec: skip=0, fetch=100",
+            "CoalescePartitionsExec",
             "LocalLimitExec: fetch=100",
             "FilterExec: c1@0",
             // nothing sorts the data, so the local limit doesn't require sorted data either
@@ -430,6 +441,7 @@ mod tests {
 
         let expected = &[
             "GlobalLimitExec: skip=5, fetch=100",
+            "CoalescePartitionsExec",
             "LocalLimitExec: fetch=100",
             "FilterExec: c1@0",
             // nothing sorts the data, so the local limit doesn't require sorted data either
@@ -443,7 +455,7 @@ mod tests {
 
     #[test]
     fn repartition_sorted_limit() -> Result<()> {
-        let plan = limit_exec(sort_exec(parquet_exec()));
+        let plan = limit_exec(sort_exec(parquet_exec(), false));
 
         let expected = &[
             "GlobalLimitExec: skip=0, fetch=100",
@@ -459,7 +471,7 @@ mod tests {
 
     #[test]
     fn repartition_sorted_limit_with_filter() -> Result<()> {
-        let plan = limit_exec(filter_exec(sort_exec(parquet_exec())));
+        let plan = limit_exec(filter_exec(sort_exec(parquet_exec(), false)));
 
         let expected = &[
             "GlobalLimitExec: skip=0, fetch=100",
@@ -481,9 +493,11 @@ mod tests {
 
         let expected = &[
             "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "CoalescePartitionsExec",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
             "GlobalLimitExec: skip=0, fetch=100",
+            "CoalescePartitionsExec",
             "LocalLimitExec: fetch=100",
             "FilterExec: c1@0",
             // repartition should happen prior to the filter to maximize parallelism
@@ -506,9 +520,11 @@ mod tests {
 
         let expected = &[
             "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "CoalescePartitionsExec",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
             "GlobalLimitExec: skip=5, fetch=100",
+            "CoalescePartitionsExec",
             "LocalLimitExec: fetch=100",
             "FilterExec: c1@0",
             // repartition should happen prior to the filter to maximize parallelism
@@ -527,7 +543,8 @@ mod tests {
 
     #[test]
     fn repartition_ignores_union() -> Result<()> {
-        let plan = Arc::new(UnionExec::new(vec![parquet_exec(); 5]));
+        let plan: Arc<dyn ExecutionPlan> =
+            Arc::new(UnionExec::new(vec![parquet_exec(); 5]));
 
         let expected = &[
             "UnionExec",
@@ -549,7 +566,8 @@ mod tests {
 
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
-            // Expect no repartition of SortPreservingMergeExec
+            "SortExec: [c1@0 ASC]",
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
         ];
 
@@ -563,9 +581,9 @@ mod tests {
 
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
-            // Expect no repartition of SortPreservingMergeExec
-            // even though there is a projection exec between it
+            "SortExec: [c1@0 ASC]",
             "ProjectionExec: expr=[c1@0 as c1]",
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
         ];
 
@@ -575,7 +593,8 @@ mod tests {
 
     #[test]
     fn repartition_transitively_past_sort_with_projection() -> Result<()> {
-        let plan = sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec())));
+        let plan =
+            sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec()), true));
 
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
@@ -592,7 +611,8 @@ mod tests {
 
     #[test]
     fn repartition_transitively_past_sort_with_filter() -> Result<()> {
-        let plan = sort_preserving_merge_exec(sort_exec(filter_exec(parquet_exec())));
+        let plan =
+            sort_preserving_merge_exec(sort_exec(filter_exec(parquet_exec()), true));
 
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
@@ -609,9 +629,10 @@ mod tests {
 
     #[test]
     fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> {
-        let plan = sort_preserving_merge_exec(sort_exec(projection_exec(filter_exec(
-            parquet_exec(),
-        ))));
+        let plan = sort_preserving_merge_exec(sort_exec(
+            projection_exec(filter_exec(parquet_exec())),
+            true,
+        ));
 
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index 4751dade1..b71f6739b 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -24,7 +24,7 @@ use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
     memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning,
 };
-use arrow::array::NullArray;
+use arrow::array::{ArrayRef, NullArray};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use log::debug;
@@ -68,13 +68,25 @@ impl EmptyExec {
 
     fn data(&self) -> Result<Vec<RecordBatch>> {
         let batch = if self.produce_one_row {
+            let n_field = self.schema.fields.len();
+            // hack for https://github.com/apache/arrow-datafusion/pull/3242
+            let n_field = if n_field == 0 { 1 } else { n_field };
             vec![RecordBatch::try_new(
-                Arc::new(Schema::new(vec![Field::new(
-                    "placeholder",
-                    DataType::Null,
-                    true,
-                )])),
-                vec![Arc::new(NullArray::new(1))],
+                Arc::new(Schema::new(
+                    (0..n_field)
+                        .into_iter()
+                        .map(|i| {
+                            Field::new(format!("placeholder_{}", i), DataType::Null, true)
+                        })
+                        .collect(),
+                )),
+                (0..n_field)
+                    .into_iter()
+                    .map(|_i| {
+                        let ret: ArrayRef = Arc::new(NullArray::new(1));
+                        ret
+                    })
+                    .collect(),
             )?]
         } else {
             vec![]
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index e16c518b6..768c42978 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1792,6 +1792,11 @@ impl DefaultPhysicalPlanner {
                         new_plan.schema()
                     )));
             }
+            trace!(
+                "Optimized physical plan by {}:\n{}\n",
+                optimizer.name(),
+                displayable(new_plan.as_ref()).indent()
+            );
             observer(new_plan.as_ref(), optimizer.as_ref())
         }
         debug!(
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index 7e701b56d..b6c78b0cf 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -754,7 +754,7 @@ async fn cross_join() {
 
         assert_eq!(4 * 4, actual.len());
 
-        let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2";
+        let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id";
 
         let actual = execute(&ctx, sql).await;
         assert_eq!(4 * 4, actual.len());
@@ -2201,8 +2201,8 @@ async fn right_semi_join() -> Result<()> {
                 "SortExec: [t1_id@0 ASC NULLS LAST]",
                 "  CoalescePartitionsExec",
                 "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
-                "      CoalesceBatchesExec: target_batch_size=4096",
-                "        RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "      RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "        CoalesceBatchesExec: target_batch_size=4096",
                 "          HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
                 "            MemoryExec: partitions=1, partition_sizes=[1]",
                 "            MemoryExec: partitions=1, partition_sizes=[1]",
@@ -2539,8 +2539,8 @@ async fn left_side_expr_key_inner_join() -> Result<()> {
             vec![
                 "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
-                "    CoalesceBatchesExec: target_batch_size=4096",
-                "      RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "    RepartitionExec: partitioning=RoundRobinBatch(2)",
+                "      CoalesceBatchesExec: target_batch_size=4096",
                 "        HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
                 "          CoalescePartitionsExec",
                 "            ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(11 AS UInt32) as t1.t1_id + Int64(11)]",
diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs
index 33ef1e51b..885adda47 100644
--- a/datafusion/core/tests/sql/timestamp.rs
+++ b/datafusion/core/tests/sql/timestamp.rs
@@ -557,7 +557,7 @@ async fn timestamp_coercion() -> Result<()> {
         ctx.register_table("table_a", table_a)?;
         ctx.register_table("table_b", table_b)?;
 
-        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
         let actual = execute_to_batches(&ctx, sql).await;
         let expected = vec![
             "+---------------------------+-------------------------------+-------------------------+",
@@ -584,7 +584,7 @@ async fn timestamp_coercion() -> Result<()> {
         ctx.register_table("table_a", table_a)?;
         ctx.register_table("table_b", table_b)?;
 
-        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
         let actual = execute_to_batches(&ctx, sql).await;
         let expected = vec![
             "+---------------------+----------------------------+-------------------------+",
@@ -612,7 +612,7 @@ async fn timestamp_coercion() -> Result<()> {
         ctx.register_table("table_a", table_a)?;
         ctx.register_table("table_b", table_b)?;
 
-        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
         let actual = execute_to_batches(&ctx, sql).await;
         let expected = vec![
             "+---------------------+----------------------------+-------------------------+",
@@ -639,7 +639,7 @@ async fn timestamp_coercion() -> Result<()> {
         ctx.register_table("table_a", table_a)?;
         ctx.register_table("table_b", table_b)?;
 
-        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
         let actual = execute_to_batches(&ctx, sql).await;
         let expected = vec![
             "+-------------------------+---------------------+-------------------------+",
@@ -666,7 +666,7 @@ async fn timestamp_coercion() -> Result<()> {
         ctx.register_table("table_a", table_a)?;
         ctx.register_table("table_b", table_b)?;
 
-        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
         let actual = execute_to_batches(&ctx, sql).await;
         let expected = vec![
             "+-------------------------+----------------------------+-------------------------+",
@@ -693,7 +693,7 @@ async fn timestamp_coercion() -> Result<()> {
         ctx.register_table("table_a", table_a)?;
         ctx.register_table("table_b", table_b)?;
 
-        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
         let actual = execute_to_batches(&ctx, sql).await;
         let expected = vec![
             "+-------------------------+----------------------------+-------------------------+",
@@ -720,7 +720,7 @@ async fn timestamp_coercion() -> Result<()> {
         ctx.register_table("table_a", table_a)?;
         ctx.register_table("table_b", table_b)?;
 
-        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
         let actual = execute_to_batches(&ctx, sql).await;
         let expected = vec![
             "+----------------------------+---------------------+-------------------------+",
@@ -747,7 +747,7 @@ async fn timestamp_coercion() -> Result<()> {
         ctx.register_table("table_a", table_a)?;
         ctx.register_table("table_b", table_b)?;
 
-        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
         let actual = execute_to_batches(&ctx, sql).await;
         let expected = vec![
             "+----------------------------+-------------------------+-------------------------+",
@@ -774,7 +774,7 @@ async fn timestamp_coercion() -> Result<()> {
         ctx.register_table("table_a", table_a)?;
         ctx.register_table("table_b", table_b)?;
 
-        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
         let actual = execute_to_batches(&ctx, sql).await;
         let expected = vec![
             "+----------------------------+----------------------------+-------------------------+",
@@ -801,7 +801,7 @@ async fn timestamp_coercion() -> Result<()> {
         ctx.register_table("table_a", table_a)?;
         ctx.register_table("table_b", table_b)?;
 
-        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
         let actual = execute_to_batches(&ctx, sql).await;
         let expected = vec![
             "+----------------------------+---------------------+-------------------------+",
@@ -828,7 +828,7 @@ async fn timestamp_coercion() -> Result<()> {
         ctx.register_table("table_a", table_a)?;
         ctx.register_table("table_b", table_b)?;
 
-        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
         let actual = execute_to_batches(&ctx, sql).await;
         let expected = vec![
             "+----------------------------+-------------------------+-------------------------+",
@@ -855,7 +855,7 @@ async fn timestamp_coercion() -> Result<()> {
         ctx.register_table("table_a", table_a)?;
         ctx.register_table("table_b", table_b)?;
 
-        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
         let actual = execute_to_batches(&ctx, sql).await;
         let expected = vec![
             "+----------------------------+----------------------------+-------------------------+",
diff --git a/datafusion/core/tests/sql/union.rs b/datafusion/core/tests/sql/union.rs
index d5519ce2d..29856a37b 100644
--- a/datafusion/core/tests/sql/union.rs
+++ b/datafusion/core/tests/sql/union.rs
@@ -86,7 +86,7 @@ async fn union_schemas() -> Result<()> {
         SessionContext::with_config(SessionConfig::new().with_information_schema(true));
 
     let result = ctx
-        .sql("SELECT 1 A UNION ALL SELECT 2")
+        .sql("SELECT 1 A UNION ALL SELECT 2 order by 1")
         .await
         .unwrap()
         .collect()
@@ -105,7 +105,7 @@ async fn union_schemas() -> Result<()> {
     assert_batches_eq!(expected, &result);
 
     let result = ctx
-        .sql("SELECT 1 UNION SELECT 2")
+        .sql("SELECT 1 UNION SELECT 2 order by 1")
         .await
         .unwrap()
         .collect()
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index 41278e120..32438b610 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -1633,7 +1633,10 @@ async fn test_window_frame_nth_value_aggregate() -> Result<()> {
 
 #[tokio::test]
 async fn test_window_agg_sort() -> Result<()> {
-    let ctx = SessionContext::new();
+    // We need to specify the target partition number.
+    // Otherwise, the default value used may vary on different environment
+    // with different cpu core number, which may cause the UT failure.
+    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
     register_aggregate_csv(&ctx).await?;
     let sql = "SELECT
       c9,
@@ -1649,9 +1652,10 @@ async fn test_window_agg_sort() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]",
-            "  WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
             "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
+            "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+            "        SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
         ]
     };
 
@@ -1681,10 +1685,11 @@ async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MI [...]
-            "  WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
-            "    WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]"
+            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "      WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+            "        WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+            "          SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]"
         ]
     };
 
@@ -1778,7 +1783,7 @@ async fn test_window_partition_by_order_by() -> Result<()> {
 
 #[tokio::test]
 async fn test_window_agg_sort_reversed_plan() -> Result<()> {
-    let ctx = SessionContext::new();
+    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
     register_aggregate_csv(&ctx).await?;
     let sql = "SELECT
     c9,
@@ -1795,10 +1800,11 @@ async fn test_window_agg_sort_reversed_plan() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
-            "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "        SortExec: [c9@0 DESC]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "    GlobalLimitExec: skip=0, fetch=5",
+            "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
+            "        WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "          SortExec: [c9@0 DESC]",
         ]
     };
 
@@ -1830,7 +1836,7 @@ async fn test_window_agg_sort_reversed_plan() -> Result<()> {
 
 #[tokio::test]
 async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
-    let ctx = SessionContext::new();
+    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
     register_aggregate_csv(&ctx).await?;
     let sql = "SELECT
     c9,
@@ -1851,10 +1857,11 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@6 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as lag1, LAG(aggregate_tes [...]
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, di [...]
-            "      WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0,  [...]
-            "        SortExec: [c9@0 DESC]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "    GlobalLimitExec: skip=0, fetch=5",
+            "      WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0,  [...]
+            "        WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0 [...]
+            "          SortExec: [c9@0 DESC]",
         ]
     };
 
@@ -1886,7 +1893,7 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
 
 #[tokio::test]
 async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
-    let ctx = SessionContext::new();
+    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
     register_aggregate_csv(&ctx).await?;
     let sql = "SELECT
     c9,
@@ -1903,11 +1910,12 @@ async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@2 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "      SortExec: [c9@1 ASC NULLS LAST]",
-            "        WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "          SortExec: [c9@0 DESC]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "    GlobalLimitExec: skip=0, fetch=5",
+            "      WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "        SortExec: [c9@1 ASC NULLS LAST]",
+            "          WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "            SortExec: [c9@0 DESC]",
         ]
     };
 
@@ -1939,7 +1947,7 @@ async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
 
 #[tokio::test]
 async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
-    let ctx = SessionContext::new();
+    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
     register_aggregate_csv(&ctx).await?;
     let sql = "SELECT
     c9,
@@ -1957,12 +1965,13 @@ async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@5 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWE [...]
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "      SortExec: [c9@4 ASC NULLS LAST,c1@2 ASC NULLS LAST,c2@3 ASC NULLS LAST]",
-            "        WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "          WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "            SortExec: [c9@2 DESC,c1@0 DESC]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "    GlobalLimitExec: skip=0, fetch=5",
+            "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "        SortExec: [c9@4 ASC NULLS LAST,c1@2 ASC NULLS LAST,c2@3 ASC NULLS LAST]",
+            "          WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "            WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "              SortExec: [c9@2 DESC,c1@0 DESC]",
         ]
     };
 
@@ -1994,7 +2003,7 @@ async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
 
 #[tokio::test]
 async fn test_window_agg_complex_plan() -> Result<()> {
-    let ctx = SessionContext::new();
+    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
     register_aggregate_null_cases_csv(&ctx).await?;
     let sql = "SELECT
     SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as a,
@@ -2045,18 +2054,19 @@ async fn test_window_agg_complex_plan() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@0 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@0 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@15 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@7 as d, SUM(null_ [...]
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preced [...]
-            "      WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: [c3@17 ASC NULLS LAST,c2@16 ASC NULLS LAST]",
-            "          WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "            SortExec: [c3@16 ASC NULLS LAST,c1@14 ASC]",
-            "              WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]",
-            "                WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_b [...]
-            "                  WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start [...]
+            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "    GlobalLimitExec: skip=0, fetch=5",
+            "      WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Prec [...]
+            "        WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
+            "          SortExec: [c3@17 ASC NULLS LAST,c2@16 ASC NULLS LAST]",
+            "            WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
+            "              SortExec: [c3@16 ASC NULLS LAST,c1@14 ASC]",
+            "                WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]",
+            "                  WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start [...]
             "                    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, sta [...]
-            "                      WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "                        SortExec: [c3@2 DESC,c1@0 ASC NULLS LAST]",
+            "                      WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, s [...]
+            "                        WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
+            "                          SortExec: [c3@2 DESC,c1@0 ASC NULLS LAST]",
         ]
     };
 
@@ -2074,7 +2084,9 @@ async fn test_window_agg_complex_plan() -> Result<()> {
 
 #[tokio::test]
 async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()> {
-    let config = SessionConfig::new().with_repartition_windows(false);
+    let config = SessionConfig::new()
+        .with_repartition_windows(false)
+        .with_target_partitions(2);
     let ctx = SessionContext::with_config(config);
     register_aggregate_csv(&ctx).await?;
     let sql = "SELECT
@@ -2092,10 +2104,11 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()>
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "    GlobalLimitExec: skip=0, fetch=5",
             "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "        SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]",
+            "        WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "          SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]",
         ]
     };
 
@@ -2127,7 +2140,9 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()>
 
 #[tokio::test]
 async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
-    let config = SessionConfig::new().with_repartition_windows(false);
+    let config = SessionConfig::new()
+        .with_repartition_windows(false)
+        .with_target_partitions(2);
     let ctx = SessionContext::with_config(config);
     register_aggregate_csv(&ctx).await?;
     let sql = "SELECT
@@ -2145,10 +2160,11 @@ async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
-            "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "        SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "    GlobalLimitExec: skip=0, fetch=5",
+            "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
+            "        WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+            "          SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]",
         ]
     };
 
@@ -2180,7 +2196,9 @@ async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
 
 #[tokio::test]
 async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
-    let config = SessionConfig::new().with_repartition_windows(false);
+    let config = SessionConfig::new()
+        .with_repartition_windows(false)
+        .with_target_partitions(2);
     let ctx = SessionContext::with_config(config);
     register_aggregate_csv(&ctx).await?;
     let sql = "SELECT c3,
@@ -2197,10 +2215,11 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
     let expected = {
         vec![
             "ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ord [...]
-            "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED P [...]
-            "        SortExec: [CAST(c3@1 AS Int16) + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "    GlobalLimitExec: skip=0, fetch=5",
+            "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_o [...]
+            "        WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED [...]
+            "          SortExec: [CAST(c3@1 AS Int16) + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]",
         ]
     };
 
@@ -2291,7 +2310,9 @@ async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> {
 
 #[tokio::test]
 async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Result<()> {
-    let config = SessionConfig::new().with_repartition_windows(false);
+    let config = SessionConfig::new()
+        .with_repartition_windows(false)
+        .with_target_partitions(2);
     let ctx = SessionContext::with_config(config);
     register_aggregate_csv(&ctx).await?;
     let sql = "SELECT c3,
@@ -2308,10 +2329,11 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     let expected = {
         vec![
             "ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "    GlobalLimitExec: skip=0, fetch=5",
+            "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+            "        WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",
+            "          SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]",
         ]
     };
 
diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index 3ca74588d..da2db8de6 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -120,6 +120,7 @@ datafusion.execution.target_partitions 7
 datafusion.execution.time_zone +00:00
 datafusion.explain.logical_plan_only false
 datafusion.explain.physical_plan_only false
+datafusion.optimizer.enable_round_robin_repartition true
 datafusion.optimizer.filter_null_join_keys false
 datafusion.optimizer.hash_join_single_partition_threshold 1048576
 datafusion.optimizer.max_passes 3
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index 039981338..1c5f08656 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -56,6 +56,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
 | then extract the hour.                                    |
 | datafusion.explain.logical_plan_only                      | Boolean | false   | When set to true, the explain statement will only print logical plans.                                                                                                                                                                                                                                                                                        |
 | datafusion.explain.physical_plan_only                     | Boolean | false   | When set to true, the explain statement will only print physical plans.                                                                                                                                                                                                                                                                                       |
+| datafusion.optimizer.enable_round_robin_repartition       | Boolean | true    | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores                                                                                                                                                                                                                      |
 | datafusion.optimizer.filter_null_join_keys                | Boolean | false   | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.                                                                                               |
 | datafusion.optimizer.hash_join_single_partition_threshold | UInt64  | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition                                                                                                                                                                                                                                                |
 | datafusion.optimizer.max_passes                           | UInt64  | 3       | Number of times that the optimizer will attempt to optimize the plan                                                                                                                                                                                                                                                                                          |