You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/20 18:35:01 UTC
[arrow-datafusion] branch main updated: MINOR: Add maintains input order flag to CoalesceBatches (#6730)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new da9d2616d3 MINOR: Add maintains input order flag to CoalesceBatches (#6730)
da9d2616d3 is described below
commit da9d2616d38aefd949a3d96595d3e80778f9fe2a
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Tue Jun 20 21:34:55 2023 +0300
MINOR: Add maintains input order flag to CoalesceBatches (#6730)
* add maintains input order flag to CoalesceBatches
* minor changes
---
.../core/src/physical_optimizer/sort_enforcement.rs | 20 +++++++++++++++-----
.../core/src/physical_plan/coalesce_batches.rs | 4 ++++
2 files changed, 19 insertions(+), 5 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index a79552de49..727380adfc 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -971,6 +971,7 @@ mod tests {
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_plan::aggregates::PhysicalGroupBy;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
+ use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::utils::JoinOn;
use crate::physical_plan::joins::SortMergeJoinExec;
@@ -1462,8 +1463,11 @@ mod tests {
},
)];
let sort = sort_exec(sort_exprs.clone(), source);
+ // Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before
+ let coalesce_batches = coalesce_batches_exec(sort);
- let window_agg = bounded_window_exec("non_nullable_col", sort_exprs, sort);
+ let window_agg =
+ bounded_window_exec("non_nullable_col", sort_exprs, coalesce_batches);
let sort_exprs = vec![sort_expr_options(
"non_nullable_col",
@@ -1491,16 +1495,18 @@ mod tests {
" FilterExec: NOT non_nullable_col@1",
" SortExec: expr=[non_nullable_col@1 ASC NULLS LAST]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
- " SortExec: expr=[non_nullable_col@1 DESC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " CoalesceBatchesExec: target_batch_size=128",
+ " SortExec: expr=[non_nullable_col@1 DESC]",
+ " MemoryExec: partitions=0, partition_sizes=[]",
];
let expected_optimized = vec![
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]",
" FilterExec: NOT non_nullable_col@1",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
- " SortExec: expr=[non_nullable_col@1 DESC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " CoalesceBatchesExec: target_batch_size=128",
+ " SortExec: expr=[non_nullable_col@1 DESC]",
+ " MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -2892,4 +2898,8 @@ mod tests {
.unwrap(),
)
}
+
+ fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+ Arc::new(CoalesceBatchesExec::new(input, 128))
+ }
}
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 0ca01aacfa..d6b34e6bf1 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -107,6 +107,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
self.input.output_ordering()
}
+ fn maintains_input_order(&self) -> Vec<bool> {
+ vec![true]
+ }
+
fn equivalence_properties(&self) -> EquivalenceProperties {
self.input.equivalence_properties()
}