You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/28 18:21:21 UTC
[arrow-datafusion] branch master updated: Teach optimizer that `CoalesceBatchesExec` does not destroy sort order (#4332)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new a31b44eae Teach optimizer that `CoalesceBatchesExec` does not destroy sort order (#4332)
a31b44eae is described below
commit a31b44eae236920d43ddd3ef7fadf8f9abf18976
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon Nov 28 13:21:14 2022 -0500
Teach optimizer that `CoalesceBatchesExec` does not destroy sort order (#4332)
---
.../core/src/physical_optimizer/enforcement.rs | 39 +++++++++++++++++++++-
.../core/src/physical_plan/coalesce_batches.rs | 3 +-
2 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 2a8252da8..6f6c504d0 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -1027,7 +1027,9 @@ impl TreeNodeRewritable for PlanWithKeyRequirements {
#[cfg(test)]
mod tests {
+ use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::filter::FilterExec;
+ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_expr::logical_plan::JoinType;
@@ -1064,6 +1066,12 @@ mod tests {
}
fn parquet_exec() -> Arc<ParquetExec> {
+ parquet_exec_with_sort(None)
+ }
+
+ fn parquet_exec_with_sort(
+ output_ordering: Option<Vec<PhysicalSortExpr>>,
+ ) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
@@ -1074,7 +1082,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
- output_ordering: None,
+ output_ordering,
},
None,
None,
@@ -2139,4 +2147,33 @@ mod tests {
assert_optimized!(expected, join);
Ok(())
}
+
+ #[test]
+ fn merge_does_not_need_sort() -> Result<()> {
+ // see https://github.com/apache/arrow-datafusion/issues/4331
+ let schema = schema();
+ let sort_key = vec![PhysicalSortExpr {
+ expr: col("a", &schema).unwrap(),
+ options: SortOptions::default(),
+ }];
+
+ // Scan some sorted parquet files
+ let exec = parquet_exec_with_sort(Some(sort_key.clone()));
+
+ // CoalesceBatchesExec to mimic behavior after a filter
+ let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096));
+
+ // Merge from multiple parquet files and keep the data sorted
+ let exec = Arc::new(SortPreservingMergeExec::new(sort_key, exec));
+
+ // The optimizer should not add an additional SortExec as the
+ // data is already sorted
+ let expected = &[
+ "SortPreservingMergeExec: [a@0 ASC]",
+ "CoalesceBatchesExec: target_batch_size=4096",
+ "ParquetExec: limit=None, partitions=[x], output_ordering=[a@0 ASC], projection=[a, b, c, d, e]",
+ ];
+ assert_optimized!(expected, exec);
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index e7c492732..762bd9092 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -97,7 +97,8 @@ impl ExecutionPlan for CoalesceBatchesExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ // The coalesce batches operator does not make any changes to the sorting of its input
+ self.input.output_ordering()
}
fn equivalence_properties(&self) -> EquivalenceProperties {