You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/28 18:21:21 UTC

[arrow-datafusion] branch master updated: Teach optimizer that `CoalesceBatchesExec` does not destroy sort order (#4332)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new a31b44eae Teach optimizer that `CoalesceBatchesExec` does not destroy sort order (#4332)
a31b44eae is described below

commit a31b44eae236920d43ddd3ef7fadf8f9abf18976
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon Nov 28 13:21:14 2022 -0500

    Teach optimizer that `CoalesceBatchesExec` does not destroy sort order (#4332)
---
 .../core/src/physical_optimizer/enforcement.rs     | 39 +++++++++++++++++++++-
 .../core/src/physical_plan/coalesce_batches.rs     |  3 +-
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 2a8252da8..6f6c504d0 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -1027,7 +1027,9 @@ impl TreeNodeRewritable for PlanWithKeyRequirements {
 
 #[cfg(test)]
 mod tests {
+    use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
     use crate::physical_plan::filter::FilterExec;
+    use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use datafusion_expr::logical_plan::JoinType;
@@ -1064,6 +1066,12 @@ mod tests {
     }
 
     fn parquet_exec() -> Arc<ParquetExec> {
+        parquet_exec_with_sort(None)
+    }
+
+    fn parquet_exec_with_sort(
+        output_ordering: Option<Vec<PhysicalSortExpr>>,
+    ) -> Arc<ParquetExec> {
         Arc::new(ParquetExec::new(
             FileScanConfig {
                 object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
@@ -1074,7 +1082,7 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 config_options: ConfigOptions::new().into_shareable(),
-                output_ordering: None,
+                output_ordering,
             },
             None,
             None,
@@ -2139,4 +2147,33 @@ mod tests {
         assert_optimized!(expected, join);
         Ok(())
     }
+
+    #[test]
+    fn merge_does_not_need_sort() -> Result<()> {
+        // see https://github.com/apache/arrow-datafusion/issues/4331
+        let schema = schema();
+        let sort_key = vec![PhysicalSortExpr {
+            expr: col("a", &schema).unwrap(),
+            options: SortOptions::default(),
+        }];
+
+        // Scan some sorted parquet files
+        let exec = parquet_exec_with_sort(Some(sort_key.clone()));
+
+        // CoalesceBatchesExec to mimic behavior after a filter
+        let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096));
+
+        // Merge from multiple parquet files and keep the data sorted
+        let exec = Arc::new(SortPreservingMergeExec::new(sort_key, exec));
+
+        // The optimizer should not add an additional SortExec as the
+        // data is already sorted
+        let expected = &[
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "CoalesceBatchesExec: target_batch_size=4096",
+            "ParquetExec: limit=None, partitions=[x], output_ordering=[a@0 ASC], projection=[a, b, c, d, e]",
+        ];
+        assert_optimized!(expected, exec);
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index e7c492732..762bd9092 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -97,7 +97,8 @@ impl ExecutionPlan for CoalesceBatchesExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+        // The coalesce batches operator does not make any changes to the sorting of its input
+        self.input.output_ordering()
     }
 
     fn equivalence_properties(&self) -> EquivalenceProperties {