You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/27 21:50:22 UTC
[arrow-datafusion] branch master updated: Reorder the physical plan optimizer rules, extract `GlobalSortSelection`, make `Repartition` optional (#4714)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 899c86a0c Reorder the physical plan optimizer rules, extract `GlobalSortSelection`, make `Repartition` optional (#4714)
899c86a0c is described below
commit 899c86a0c62f0c3f6324d3125158ec643b524515
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Wed Dec 28 05:50:17 2022 +0800
Reorder the physical plan optimizer rules, extract `GlobalSortSelection`, make `Repartition` optional (#4714)
* Extract the global sort algorithm selection from the BasicEnforcement to be a separate rule, GlobalSortSelection
* Make the optimizer rule of Repartition optional
* Fix EmptyExec data() method
* Reorder the physical plan optimizer rules
* Refine the UT for the repartition rule by adding the essential BasicEnforcement rule
* Fix UT failure for window function
* Fix example testing
Co-authored-by: yangzhong <ya...@ebay.com>
---
datafusion-examples/examples/custom_datasource.rs | 2 +-
datafusion/core/src/config.rs | 9 ++
datafusion/core/src/execution/context.rs | 58 ++++++---
.../src/physical_optimizer/coalesce_batches.rs | 13 +-
.../core/src/physical_optimizer/enforcement.rs | 33 +----
.../physical_optimizer/global_sort_selection.rs | 89 +++++++++++++
datafusion/core/src/physical_optimizer/mod.rs | 1 +
.../core/src/physical_optimizer/repartition.rs | 101 +++++++++------
datafusion/core/src/physical_plan/empty.rs | 26 +++-
datafusion/core/src/physical_plan/planner.rs | 5 +
datafusion/core/tests/sql/joins.rs | 10 +-
datafusion/core/tests/sql/timestamp.rs | 24 ++--
datafusion/core/tests/sql/union.rs | 4 +-
datafusion/core/tests/sql/window.rs | 144 ++++++++++++---------
.../test_files/information_schema.slt | 1 +
docs/source/user-guide/configs.md | 1 +
16 files changed, 344 insertions(+), 177 deletions(-)
diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs
index 68e8f5a54..c426d9611 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -264,6 +264,6 @@ impl ExecutionPlan for CustomExec {
}
fn statistics(&self) -> Statistics {
- todo!()
+ Statistics::default()
}
}
diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index 4dabb53d2..976b8c073 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -131,6 +131,10 @@ pub const OPT_PREFER_HASH_JOIN: &str = "datafusion.optimizer.prefer_hash_join";
pub const OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str =
"datafusion.optimizer.hash_join_single_partition_threshold";
+/// Configuration option "datafusion.execution.round_robin_repartition"
+pub const OPT_ENABLE_ROUND_ROBIN_REPARTITION: &str =
+ "datafusion.optimizer.enable_round_robin_repartition";
+
/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
@@ -409,6 +413,11 @@ impl BuiltInConfigs {
"The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition",
1024 * 1024,
),
+ ConfigDefinition::new_bool(
+ OPT_ENABLE_ROUND_ROBIN_REPARTITION,
+ "When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores",
+ true,
+ ),
]
}
}
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 9c909d5d6..db6ab99fb 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -73,7 +73,8 @@ use crate::physical_optimizer::repartition::Repartition;
use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
- OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
+ OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS,
+ OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_optimizer::enforcement::BasicEnforcement;
@@ -98,6 +99,7 @@ use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_pool::MemoryPool;
+use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
use uuid::Uuid;
@@ -1530,11 +1532,47 @@ impl SessionState {
);
}
- let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
- Arc::new(AggregateStatistics::new()),
- Arc::new(JoinSelection::new()),
- ];
+ // We need to take care of the rule ordering. They may influence each other.
+ let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> =
+ vec![Arc::new(AggregateStatistics::new())];
+ // - In order to increase the parallelism, it will change the output partitioning
+ // of some operators in the plan tree, which will influence other rules.
+ // Therefore, it should be run as soon as possible.
+ // - The reason to make it optional is
+ // - it's not used for the distributed engine, Ballista.
+ // - it's conflicted with some parts of the BasicEnforcement, since it will
+ // introduce additional repartitioning while the BasicEnforcement aims at
+ // reducing unnecessary repartitioning.
+ if config
+ .config_options
+ .get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
+ .unwrap_or_default()
+ {
+ physical_optimizers.push(Arc::new(Repartition::new()));
+ }
+ //- Currently it will depend on the partition number to decide whether to change the
+ // single node sort to parallel local sort and merge. Therefore, it should be run
+ // after the Repartition.
+ // - Since it will change the output ordering of some operators, it should be run
+ // before JoinSelection and BasicEnforcement, which may depend on that.
+ physical_optimizers.push(Arc::new(GlobalSortSelection::new()));
+ // Statistics-base join selection will change the Auto mode to real join implementation,
+ // like collect left, or hash join, or future sort merge join, which will
+ // influence the BasicEnforcement to decide whether to add additional repartition
+ // and local sort to meet the distribution and ordering requirements.
+ // Therefore, it should be run before BasicEnforcement
+ physical_optimizers.push(Arc::new(JoinSelection::new()));
+ // It's for adding essential repartition and local sorting operator to satisfy the
+ // required distribution and local sort.
+ // Please make sure that the whole plan tree is determined.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
+ // `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
+ // However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
+ // These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
+ // rule below performs this analysis and removes unnecessary `SortExec`s.
+ physical_optimizers.push(Arc::new(OptimizeSorts::new()));
+ // It will not influence the distribution and ordering of the whole plan tree.
+ // Therefore, to avoid influencing other rules, it should be run at last.
if config
.config_options
.get_bool(OPT_COALESCE_BATCHES)
@@ -1549,16 +1587,6 @@ impl SessionState {
.unwrap(),
)));
}
- physical_optimizers.push(Arc::new(Repartition::new()));
- // Repartition rule could introduce additional RepartitionExec with RoundRobin partitioning.
- // To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.
- physical_optimizers.push(Arc::new(BasicEnforcement::new()));
-
- // `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
- // However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
- // These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
- // rule below performs this analysis and removes unnecessary `SortExec`s.
- physical_optimizers.push(Arc::new(OptimizeSorts::new()));
SessionState {
session_id,
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 941c5c141..e0d20be16 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -23,7 +23,7 @@ use crate::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
- repartition::RepartitionExec, rewrite::TreeNodeRewritable,
+ repartition::RepartitionExec, rewrite::TreeNodeRewritable, Partitioning,
},
};
use std::sync::Arc;
@@ -57,7 +57,16 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// See https://github.com/apache/arrow-datafusion/issues/139
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
- || plan_any.downcast_ref::<RepartitionExec>().is_some();
+ // Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
+ || plan_any
+ .downcast_ref::<RepartitionExec>()
+ .map(|repart_exec| {
+ !matches!(
+ repart_exec.partitioning().clone(),
+ Partitioning::RoundRobinBatch(_)
+ )
+ })
+ .unwrap_or(false);
if wrap_in_coalesce {
Ok(Some(Arc::new(CoalesceBatchesExec::new(
plan.clone(),
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 4a496a3ef..30a796191 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -30,8 +30,7 @@ use crate::physical_plan::joins::{
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::rewrite::TreeNodeRewritable;
-use crate::physical_plan::sorts::sort::{SortExec, SortOptions};
-use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::sorts::sort::SortOptions;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
@@ -844,36 +843,6 @@ fn ensure_distribution_and_ordering(
if plan.children().is_empty() {
return Ok(plan);
}
- // It's mainly for changing the single node global SortExec to
- // the SortPreservingMergeExec with multiple local SortExec.
- // What's more, if limit exists, it can also be pushed down to the local sort
- let plan = plan
- .as_any()
- .downcast_ref::<SortExec>()
- .and_then(|sort_exec| {
- // There are three situations that there's no need for this optimization
- // - There's only one input partition;
- // - It's already preserving the partitioning so that it can be regarded as a local sort
- // - There's no limit pushed down to the local sort (It's still controversial)
- if sort_exec.input().output_partitioning().partition_count() > 1
- && !sort_exec.preserve_partitioning()
- && sort_exec.fetch().is_some()
- {
- let sort = SortExec::new_with_partitioning(
- sort_exec.expr().to_vec(),
- sort_exec.input().clone(),
- true,
- sort_exec.fetch(),
- );
- Some(Arc::new(SortPreservingMergeExec::new(
- sort_exec.expr().to_vec(),
- Arc::new(sort),
- )))
- } else {
- None
- }
- })
- .map_or(plan, |new_plan| new_plan);
let required_input_distributions = plan.required_input_distribution();
let required_input_orderings = plan.required_input_ordering();
diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
new file mode 100644
index 000000000..a6bb8229c
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Select the efficient global sort implementation based on sort details.
+
+use std::sync::Arc;
+
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::ExecutionPlan;
+use crate::prelude::SessionConfig;
+
+/// Currently for a sort operator, if
+/// - there are more than one input partitions
+/// - and there's some limit which can be pushed down to each of its input partitions
+/// then [SortPreservingMergeExec] with local sort with a limit pushed down will be preferred;
+/// Otherwise, the normal global sort [SortExec] will be used.
+/// Later more intelligent statistics-based decision can also be introduced.
+/// For example, for a small data set, the global sort may be efficient enough
+#[derive(Default)]
+pub struct GlobalSortSelection {}
+
+impl GlobalSortSelection {
+ #[allow(missing_docs)]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl PhysicalOptimizerRule for GlobalSortSelection {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ _config: &SessionConfig,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ plan.transform_up(&|plan| {
+ Ok(plan
+ .as_any()
+ .downcast_ref::<SortExec>()
+ .and_then(|sort_exec| {
+ if sort_exec.input().output_partitioning().partition_count() > 1
+ && sort_exec.fetch().is_some()
+ // It's already preserving the partitioning so that it can be regarded as a local sort
+ && !sort_exec.preserve_partitioning()
+ {
+ let sort = SortExec::new_with_partitioning(
+ sort_exec.expr().to_vec(),
+ sort_exec.input().clone(),
+ true,
+ sort_exec.fetch(),
+ );
+ let global_sort: Arc<dyn ExecutionPlan> =
+ Arc::new(SortPreservingMergeExec::new(
+ sort_exec.expr().to_vec(),
+ Arc::new(sort),
+ ));
+ Some(global_sort)
+ } else {
+ None
+ }
+ }))
+ })
+ }
+
+ fn name(&self) -> &str {
+ "global_sort_selection"
+ }
+
+ fn schema_check(&self) -> bool {
+ false
+ }
+}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs
index 0fd0600fb..86ec6f846 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -21,6 +21,7 @@
pub mod aggregate_statistics;
pub mod coalesce_batches;
pub mod enforcement;
+pub mod global_sort_selection;
pub mod join_selection;
pub mod optimize_sorts;
pub mod optimizer;
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 7bdff91ec..2d3f7a0e1 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -143,7 +143,7 @@ impl Repartition {
/// is an intervening node that does not `maintain_input_order`
///
/// if `can_reorder` is false, means that the output of this node
-/// can not be reordered as as something upstream is relying on that order
+/// can not be reordered as as the final output is relying on that order
///
/// If 'would_benefit` is false, the upstream operator doesn't
/// benefit from additional repartition
@@ -161,26 +161,6 @@ fn optimize_partitions(
// leaf node - don't replace children
plan
} else {
- let can_reorder_children =
- match (plan.relies_on_input_order(), plan.maintains_input_order()) {
- (true, _) => {
- // `plan` itself relies on the order of its
- // children, so don't reorder them!
- false
- }
- (false, false) => {
- // `plan` may reorder the input itself, so no need
- // to preserve the order of any children
- true
- }
- (false, true) => {
- // `plan` will maintain the order, so we can only
- // repartition children if it is ok to reorder the
- // output of this node
- can_reorder
- }
- };
-
let children = plan
.children()
.iter()
@@ -188,7 +168,7 @@ fn optimize_partitions(
optimize_partitions(
target_partitions,
child.clone(),
- can_reorder_children,
+ can_reorder || child.output_ordering().is_none(),
plan.benefits_from_input_partitioning(),
)
})
@@ -197,7 +177,7 @@ fn optimize_partitions(
};
// decide if we should bother trying to repartition the output of this plan
- let could_repartition = match new_plan.output_partitioning() {
+ let mut could_repartition = match new_plan.output_partitioning() {
// Apply when underlying node has less than `self.target_partitions` amount of concurrency
RoundRobinBatch(x) => x < target_partitions,
UnknownPartitioning(x) => x < target_partitions,
@@ -206,6 +186,13 @@ fn optimize_partitions(
Hash(_, _) => false,
};
+ // Don't need to apply when the returned row count is not greater than 1
+ let stats = new_plan.statistics();
+ if stats.is_exact {
+ could_repartition = could_repartition
+ && stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true);
+ }
+
if would_benefit && could_repartition && can_reorder {
Ok(Arc::new(RepartitionExec::try_new(
new_plan,
@@ -226,7 +213,12 @@ impl PhysicalOptimizerRule for Repartition {
if config.target_partitions() == 1 {
Ok(plan)
} else {
- optimize_partitions(config.target_partitions(), plan, false, false)
+ optimize_partitions(
+ config.target_partitions(),
+ plan.clone(),
+ plan.output_ordering().is_none(),
+ false,
+ )
}
}
@@ -246,6 +238,7 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
+ use crate::physical_optimizer::enforcement::BasicEnforcement;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
@@ -295,12 +288,20 @@ mod tests {
Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap())
}
- fn sort_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+ fn sort_exec(
+ input: Arc<dyn ExecutionPlan>,
+ preserve_partitioning: bool,
+ ) -> Arc<dyn ExecutionPlan> {
let sort_exprs = vec![PhysicalSortExpr {
expr: col("c1", &schema()).unwrap(),
options: SortOptions::default(),
}];
- Arc::new(SortExec::try_new(sort_exprs, input, None).unwrap())
+ Arc::new(SortExec::new_with_partitioning(
+ sort_exprs,
+ input,
+ preserve_partitioning,
+ None,
+ ))
}
fn projection_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
@@ -360,9 +361,16 @@ mod tests {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
// run optimizer
- let optimizer = Repartition {};
- let optimized = optimizer
- .optimize($PLAN, &SessionConfig::new().with_target_partitions(10))?;
+ let config = SessionConfig::new().with_target_partitions(10);
+ let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
+ Arc::new(Repartition::new()),
+ // The `BasicEnforcement` is an essential rule to be applied.
+ // Otherwise, the correctness of the generated optimized plan cannot be guaranteed
+ Arc::new(BasicEnforcement::new()),
+ ];
+ let optimized = optimizers.into_iter().fold($PLAN, |plan, optimizer| {
+ optimizer.optimize(plan, &config).unwrap()
+ });
// Now format correctly
let plan = displayable(optimized.as_ref()).indent().to_string();
@@ -382,6 +390,7 @@ mod tests {
let expected = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
+ "CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
@@ -397,6 +406,7 @@ mod tests {
let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[]",
+ "CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
@@ -413,6 +423,7 @@ mod tests {
let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
+ "CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// nothing sorts the data, so the local limit doesn't require sorted data either
@@ -430,6 +441,7 @@ mod tests {
let expected = &[
"GlobalLimitExec: skip=5, fetch=100",
+ "CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// nothing sorts the data, so the local limit doesn't require sorted data either
@@ -443,7 +455,7 @@ mod tests {
#[test]
fn repartition_sorted_limit() -> Result<()> {
- let plan = limit_exec(sort_exec(parquet_exec()));
+ let plan = limit_exec(sort_exec(parquet_exec(), false));
let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
@@ -459,7 +471,7 @@ mod tests {
#[test]
fn repartition_sorted_limit_with_filter() -> Result<()> {
- let plan = limit_exec(filter_exec(sort_exec(parquet_exec())));
+ let plan = limit_exec(filter_exec(sort_exec(parquet_exec(), false)));
let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
@@ -481,9 +493,11 @@ mod tests {
let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[]",
+ "CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"GlobalLimitExec: skip=0, fetch=100",
+ "CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// repartition should happen prior to the filter to maximize parallelism
@@ -506,9 +520,11 @@ mod tests {
let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[]",
+ "CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"GlobalLimitExec: skip=5, fetch=100",
+ "CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// repartition should happen prior to the filter to maximize parallelism
@@ -527,7 +543,8 @@ mod tests {
#[test]
fn repartition_ignores_union() -> Result<()> {
- let plan = Arc::new(UnionExec::new(vec![parquet_exec(); 5]));
+ let plan: Arc<dyn ExecutionPlan> =
+ Arc::new(UnionExec::new(vec![parquet_exec(); 5]));
let expected = &[
"UnionExec",
@@ -549,7 +566,8 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
- // Expect no repartition of SortPreservingMergeExec
+ "SortExec: [c1@0 ASC]",
+ "RepartitionExec: partitioning=RoundRobinBatch(10)",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
@@ -563,9 +581,9 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
- // Expect no repartition of SortPreservingMergeExec
- // even though there is a projection exec between it
+ "SortExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
+ "RepartitionExec: partitioning=RoundRobinBatch(10)",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];
@@ -575,7 +593,8 @@ mod tests {
#[test]
fn repartition_transitively_past_sort_with_projection() -> Result<()> {
- let plan = sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec())));
+ let plan =
+ sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec()), true));
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
@@ -592,7 +611,8 @@ mod tests {
#[test]
fn repartition_transitively_past_sort_with_filter() -> Result<()> {
- let plan = sort_preserving_merge_exec(sort_exec(filter_exec(parquet_exec())));
+ let plan =
+ sort_preserving_merge_exec(sort_exec(filter_exec(parquet_exec()), true));
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
@@ -609,9 +629,10 @@ mod tests {
#[test]
fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> {
- let plan = sort_preserving_merge_exec(sort_exec(projection_exec(filter_exec(
- parquet_exec(),
- ))));
+ let plan = sort_preserving_merge_exec(sort_exec(
+ projection_exec(filter_exec(parquet_exec())),
+ true,
+ ));
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index 4751dade1..b71f6739b 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -24,7 +24,7 @@ use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning,
};
-use arrow::array::NullArray;
+use arrow::array::{ArrayRef, NullArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use log::debug;
@@ -68,13 +68,25 @@ impl EmptyExec {
fn data(&self) -> Result<Vec<RecordBatch>> {
let batch = if self.produce_one_row {
+ let n_field = self.schema.fields.len();
+ // hack for https://github.com/apache/arrow-datafusion/pull/3242
+ let n_field = if n_field == 0 { 1 } else { n_field };
vec![RecordBatch::try_new(
- Arc::new(Schema::new(vec![Field::new(
- "placeholder",
- DataType::Null,
- true,
- )])),
- vec![Arc::new(NullArray::new(1))],
+ Arc::new(Schema::new(
+ (0..n_field)
+ .into_iter()
+ .map(|i| {
+ Field::new(format!("placeholder_{}", i), DataType::Null, true)
+ })
+ .collect(),
+ )),
+ (0..n_field)
+ .into_iter()
+ .map(|_i| {
+ let ret: ArrayRef = Arc::new(NullArray::new(1));
+ ret
+ })
+ .collect(),
)?]
} else {
vec![]
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index e16c518b6..768c42978 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1792,6 +1792,11 @@ impl DefaultPhysicalPlanner {
new_plan.schema()
)));
}
+ trace!(
+ "Optimized physical plan by {}:\n{}\n",
+ optimizer.name(),
+ displayable(new_plan.as_ref()).indent()
+ );
observer(new_plan.as_ref(), optimizer.as_ref())
}
debug!(
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index 7e701b56d..b6c78b0cf 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -754,7 +754,7 @@ async fn cross_join() {
assert_eq!(4 * 4, actual.len());
- let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2";
+ let sql = "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id";
let actual = execute(&ctx, sql).await;
assert_eq!(4 * 4, actual.len());
@@ -2201,8 +2201,8 @@ async fn right_semi_join() -> Result<()> {
"SortExec: [t1_id@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
" MemoryExec: partitions=1, partition_sizes=[1]",
" MemoryExec: partitions=1, partition_sizes=[1]",
@@ -2539,8 +2539,8 @@ async fn left_side_expr_key_inner_join() -> Result<()> {
vec![
"ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(11 AS UInt32) as t1.t1_id + Int64(11)]",
diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs
index 33ef1e51b..885adda47 100644
--- a/datafusion/core/tests/sql/timestamp.rs
+++ b/datafusion/core/tests/sql/timestamp.rs
@@ -557,7 +557,7 @@ async fn timestamp_coercion() -> Result<()> {
ctx.register_table("table_a", table_a)?;
ctx.register_table("table_b", table_b)?;
- let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------------+-------------------------------+-------------------------+",
@@ -584,7 +584,7 @@ async fn timestamp_coercion() -> Result<()> {
ctx.register_table("table_a", table_a)?;
ctx.register_table("table_b", table_b)?;
- let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------+----------------------------+-------------------------+",
@@ -612,7 +612,7 @@ async fn timestamp_coercion() -> Result<()> {
ctx.register_table("table_a", table_a)?;
ctx.register_table("table_b", table_b)?;
- let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------+----------------------------+-------------------------+",
@@ -639,7 +639,7 @@ async fn timestamp_coercion() -> Result<()> {
ctx.register_table("table_a", table_a)?;
ctx.register_table("table_b", table_b)?;
- let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-------------------------+---------------------+-------------------------+",
@@ -666,7 +666,7 @@ async fn timestamp_coercion() -> Result<()> {
ctx.register_table("table_a", table_a)?;
ctx.register_table("table_b", table_b)?;
- let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-------------------------+----------------------------+-------------------------+",
@@ -693,7 +693,7 @@ async fn timestamp_coercion() -> Result<()> {
ctx.register_table("table_a", table_a)?;
ctx.register_table("table_b", table_b)?;
- let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-------------------------+----------------------------+-------------------------+",
@@ -720,7 +720,7 @@ async fn timestamp_coercion() -> Result<()> {
ctx.register_table("table_a", table_a)?;
ctx.register_table("table_b", table_b)?;
- let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+----------------------------+---------------------+-------------------------+",
@@ -747,7 +747,7 @@ async fn timestamp_coercion() -> Result<()> {
ctx.register_table("table_a", table_a)?;
ctx.register_table("table_b", table_b)?;
- let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+----------------------------+-------------------------+-------------------------+",
@@ -774,7 +774,7 @@ async fn timestamp_coercion() -> Result<()> {
ctx.register_table("table_a", table_a)?;
ctx.register_table("table_b", table_b)?;
- let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+----------------------------+----------------------------+-------------------------+",
@@ -801,7 +801,7 @@ async fn timestamp_coercion() -> Result<()> {
ctx.register_table("table_a", table_a)?;
ctx.register_table("table_b", table_b)?;
- let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+----------------------------+---------------------+-------------------------+",
@@ -828,7 +828,7 @@ async fn timestamp_coercion() -> Result<()> {
ctx.register_table("table_a", table_a)?;
ctx.register_table("table_b", table_b)?;
- let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+----------------------------+-------------------------+-------------------------+",
@@ -855,7 +855,7 @@ async fn timestamp_coercion() -> Result<()> {
ctx.register_table("table_a", table_a)?;
ctx.register_table("table_b", table_b)?;
- let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b order by table_a.ts desc, table_b.ts desc";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+----------------------------+----------------------------+-------------------------+",
diff --git a/datafusion/core/tests/sql/union.rs b/datafusion/core/tests/sql/union.rs
index d5519ce2d..29856a37b 100644
--- a/datafusion/core/tests/sql/union.rs
+++ b/datafusion/core/tests/sql/union.rs
@@ -86,7 +86,7 @@ async fn union_schemas() -> Result<()> {
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
let result = ctx
- .sql("SELECT 1 A UNION ALL SELECT 2")
+ .sql("SELECT 1 A UNION ALL SELECT 2 order by 1")
.await
.unwrap()
.collect()
@@ -105,7 +105,7 @@ async fn union_schemas() -> Result<()> {
assert_batches_eq!(expected, &result);
let result = ctx
- .sql("SELECT 1 UNION SELECT 2")
+ .sql("SELECT 1 UNION SELECT 2 order by 1")
.await
.unwrap()
.collect()
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index 41278e120..32438b610 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -1633,7 +1633,10 @@ async fn test_window_frame_nth_value_aggregate() -> Result<()> {
#[tokio::test]
async fn test_window_agg_sort() -> Result<()> {
- let ctx = SessionContext::new();
+ // We need to specify the target partition number.
+ // Otherwise, the default value used may vary on different environment
+ // with different cpu core number, which may cause the UT failure.
+ let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
c9,
@@ -1649,9 +1652,10 @@ async fn test_window_agg_sort() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum2]",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+ " SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
]
};
@@ -1681,10 +1685,11 @@ async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MI [...]
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
- " WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]"
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+ " WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+ " WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+ " SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]"
]
};
@@ -1778,7 +1783,7 @@ async fn test_window_partition_by_order_by() -> Result<()> {
#[tokio::test]
async fn test_window_agg_sort_reversed_plan() -> Result<()> {
- let ctx = SessionContext::new();
+ let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
c9,
@@ -1795,10 +1800,11 @@ async fn test_window_agg_sort_reversed_plan() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: [c9@0 DESC]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " GlobalLimitExec: skip=0, fetch=5",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+ " SortExec: [c9@0 DESC]",
]
};
@@ -1830,7 +1836,7 @@ async fn test_window_agg_sort_reversed_plan() -> Result<()> {
#[tokio::test]
async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
- let ctx = SessionContext::new();
+ let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
c9,
@@ -1851,10 +1857,11 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@6 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as lag1, LAG(aggregate_tes [...]
- " GlobalLimitExec: skip=0, fetch=5",
- " WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, di [...]
- " WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, [...]
- " SortExec: [c9@0 DESC]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " GlobalLimitExec: skip=0, fetch=5",
+ " WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, [...]
+ " WindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0 [...]
+ " SortExec: [c9@0 DESC]",
]
};
@@ -1886,7 +1893,7 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
#[tokio::test]
async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
- let ctx = SessionContext::new();
+ let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
c9,
@@ -1903,11 +1910,12 @@ async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@2 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: [c9@1 ASC NULLS LAST]",
- " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: [c9@0 DESC]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " GlobalLimitExec: skip=0, fetch=5",
+ " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+ " SortExec: [c9@1 ASC NULLS LAST]",
+ " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+ " SortExec: [c9@0 DESC]",
]
};
@@ -1939,7 +1947,7 @@ async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
#[tokio::test]
async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
- let ctx = SessionContext::new();
+ let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
c9,
@@ -1957,12 +1965,13 @@ async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@5 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWE [...]
- " GlobalLimitExec: skip=0, fetch=5",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: [c9@4 ASC NULLS LAST,c1@2 ASC NULLS LAST,c2@3 ASC NULLS LAST]",
- " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: [c9@2 DESC,c1@0 DESC]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " GlobalLimitExec: skip=0, fetch=5",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+ " SortExec: [c9@4 ASC NULLS LAST,c1@2 ASC NULLS LAST,c2@3 ASC NULLS LAST]",
+ " WindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+ " SortExec: [c9@2 DESC,c1@0 DESC]",
]
};
@@ -1994,7 +2003,7 @@ async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
#[tokio::test]
async fn test_window_agg_complex_plan() -> Result<()> {
- let ctx = SessionContext::new();
+ let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
register_aggregate_null_cases_csv(&ctx).await?;
let sql = "SELECT
SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as a,
@@ -2045,18 +2054,19 @@ async fn test_window_agg_complex_plan() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@0 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@0 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@15 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@7 as d, SUM(null_ [...]
- " GlobalLimitExec: skip=0, fetch=5",
- " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preced [...]
- " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c3@17 ASC NULLS LAST,c2@16 ASC NULLS LAST]",
- " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c3@16 ASC NULLS LAST,c1@14 ASC]",
- " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]",
- " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_b [...]
- " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start [...]
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " GlobalLimitExec: skip=0, fetch=5",
+ " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Prec [...]
+ " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
+ " SortExec: [c3@17 ASC NULLS LAST,c2@16 ASC NULLS LAST]",
+ " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
+ " SortExec: [c3@16 ASC NULLS LAST,c1@14 ASC]",
+ " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]",
+ " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start [...]
" WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, sta [...]
- " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c3@2 DESC,c1@0 ASC NULLS LAST]",
+ " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, s [...]
+ " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
+ " SortExec: [c3@2 DESC,c1@0 ASC NULLS LAST]",
]
};
@@ -2074,7 +2084,9 @@ async fn test_window_agg_complex_plan() -> Result<()> {
#[tokio::test]
async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()> {
- let config = SessionConfig::new().with_repartition_windows(false);
+ let config = SessionConfig::new()
+ .with_repartition_windows(false)
+ .with_target_partitions(2);
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
@@ -2092,10 +2104,11 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()>
let expected = {
vec![
"ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+ " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]",
]
};
@@ -2127,7 +2140,9 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()>
#[tokio::test]
async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
- let config = SessionConfig::new().with_repartition_windows(false);
+ let config = SessionConfig::new()
+ .with_repartition_windows(false)
+ .with_target_partitions(2);
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
@@ -2145,10 +2160,11 @@ async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c9@3 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " GlobalLimitExec: skip=0, fetch=5",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
+ " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]",
]
};
@@ -2180,7 +2196,9 @@ async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
#[tokio::test]
async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
- let config = SessionConfig::new().with_repartition_windows(false);
+ let config = SessionConfig::new()
+ .with_repartition_windows(false)
+ .with_target_partitions(2);
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;
let sql = "SELECT c3,
@@ -2197,10 +2215,11 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
let expected = {
vec![
"ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ord [...]
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED P [...]
- " SortExec: [CAST(c3@1 AS Int16) + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " GlobalLimitExec: skip=0, fetch=5",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_o [...]
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED [...]
+ " SortExec: [CAST(c3@1 AS Int16) + c4@2 DESC,c9@3 DESC,c2@0 ASC NULLS LAST]",
]
};
@@ -2291,7 +2310,9 @@ async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> {
#[tokio::test]
async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Result<()> {
- let config = SessionConfig::new().with_repartition_windows(false);
+ let config = SessionConfig::new()
+ .with_repartition_windows(false)
+ .with_target_partitions(2);
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;
let sql = "SELECT c3,
@@ -2308,10 +2329,11 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
let expected = {
vec![
"ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " GlobalLimitExec: skip=0, fetch=5",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",
+ " SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]",
]
};
diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index 3ca74588d..da2db8de6 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -120,6 +120,7 @@ datafusion.execution.target_partitions 7
datafusion.execution.time_zone +00:00
datafusion.explain.logical_plan_only false
datafusion.explain.physical_plan_only false
+datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.filter_null_join_keys false
datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.max_passes 3
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index 039981338..1c5f08656 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -56,6 +56,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| then extract the hour. |
| datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. |
| datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. |
+| datafusion.optimizer.enable_round_robin_repartition | Boolean | true | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores |
| datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |
| datafusion.optimizer.hash_join_single_partition_threshold | UInt64 | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.max_passes | UInt64 | 3 | Number of times that the optimizer will attempt to optimize the plan |