You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/20 18:35:01 UTC

[arrow-datafusion] branch main updated: MINOR: Add maintains input order flag to CoalesceBatches (#6730)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new da9d2616d3 MINOR: Add maintains input order flag to CoalesceBatches (#6730)
da9d2616d3 is described below

commit da9d2616d38aefd949a3d96595d3e80778f9fe2a
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Tue Jun 20 21:34:55 2023 +0300

    MINOR: Add maintains input order flag to CoalesceBatches (#6730)
    
    * add maintains input order flag to CoalesceBatches
    
    * minor changes
---
 .../core/src/physical_optimizer/sort_enforcement.rs  | 20 +++++++++++++++-----
 .../core/src/physical_plan/coalesce_batches.rs       |  4 ++++
 2 files changed, 19 insertions(+), 5 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index a79552de49..727380adfc 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -971,6 +971,7 @@ mod tests {
     use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
     use crate::physical_plan::aggregates::PhysicalGroupBy;
     use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
+    use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
     use crate::physical_plan::filter::FilterExec;
     use crate::physical_plan::joins::utils::JoinOn;
     use crate::physical_plan::joins::SortMergeJoinExec;
@@ -1462,8 +1463,11 @@ mod tests {
             },
         )];
         let sort = sort_exec(sort_exprs.clone(), source);
+        // Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before
+        let coalesce_batches = coalesce_batches_exec(sort);
 
-        let window_agg = bounded_window_exec("non_nullable_col", sort_exprs, sort);
+        let window_agg =
+            bounded_window_exec("non_nullable_col", sort_exprs, coalesce_batches);
 
         let sort_exprs = vec![sort_expr_options(
             "non_nullable_col",
@@ -1491,16 +1495,18 @@ mod tests {
             "  FilterExec: NOT non_nullable_col@1",
             "    SortExec: expr=[non_nullable_col@1 ASC NULLS LAST]",
             "      BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
-            "        SortExec: expr=[non_nullable_col@1 DESC]",
-            "          MemoryExec: partitions=0, partition_sizes=[]",
+            "        CoalesceBatchesExec: target_batch_size=128",
+            "          SortExec: expr=[non_nullable_col@1 DESC]",
+            "            MemoryExec: partitions=0, partition_sizes=[]",
         ];
 
         let expected_optimized = vec![
             "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]",
             "  FilterExec: NOT non_nullable_col@1",
             "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
-            "      SortExec: expr=[non_nullable_col@1 DESC]",
-            "        MemoryExec: partitions=0, partition_sizes=[]",
+            "      CoalesceBatchesExec: target_batch_size=128",
+            "        SortExec: expr=[non_nullable_col@1 DESC]",
+            "          MemoryExec: partitions=0, partition_sizes=[]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -2892,4 +2898,8 @@ mod tests {
             .unwrap(),
         )
     }
+
+    fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+        Arc::new(CoalesceBatchesExec::new(input, 128))
+    }
 }
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 0ca01aacfa..d6b34e6bf1 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -107,6 +107,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
         self.input.output_ordering()
     }
 
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![true]
+    }
+
     fn equivalence_properties(&self) -> EquivalenceProperties {
         self.input.equivalence_properties()
     }