You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/02/04 16:32:41 UTC

[arrow-datafusion] 01/01: Do not repartition sorted inputs `SortPreservingMerge`

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch alamb/less_repartitioing
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit c99e91e69aacdfb79d5b7eddf00b3f1597d3e87c
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri Feb 4 11:32:29 2022 -0500

    Do not repartition sorted inputs `SortPreservingMerge`
---
 datafusion/src/physical_optimizer/repartition.rs   | 35 +++++++++++++++++++++-
 .../physical_plan/sorts/sort_preserving_merge.rs   |  6 ++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs
index 1f45053..6f6d678 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -109,11 +109,12 @@ mod tests {
 
     use super::*;
     use crate::datasource::PartitionedFile;
-    use crate::physical_plan::expressions::col;
+    use crate::physical_plan::expressions::{col, PhysicalSortExpr};
     use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use crate::physical_plan::filter::FilterExec;
     use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
     use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+    use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
     use crate::physical_plan::union::UnionExec;
     use crate::physical_plan::{displayable, Statistics};
     use crate::test::object_store::TestObjectStore;
@@ -137,6 +138,17 @@ mod tests {
         ))
     }
 
+    fn sort_preserving_merge_exec(
+        input: Arc<dyn ExecutionPlan>,
+    ) -> Arc<dyn ExecutionPlan> {
+        let expr = vec![PhysicalSortExpr {
+            expr: col("c1", &schema()).unwrap(),
+            options: arrow::compute::SortOptions::default(),
+        }];
+
+        Arc::new(SortPreservingMergeExec::new(expr, input))
+    }
+
     fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
         Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap())
     }
@@ -276,4 +288,25 @@ mod tests {
         assert_eq!(&trim_plan_display(&plan), &expected);
         Ok(())
     }
+
+    #[test]
+    fn repartition_ignores_sort_preserving_merge() -> Result<()> {
+        let optimizer = Repartition {};
+
+        let optimized = optimizer.optimize(
+            sort_preserving_merge_exec(parquet_exec()),
+            &ExecutionConfig::new().with_target_partitions(5),
+        )?;
+
+        let plan = displayable(optimized.as_ref()).indent().to_string();
+
+        let expected = &[
+            "SortPreservingMergeExec: [c1@0 ASC]",
+            // Expect no repartition of SortPreservingMergeExec
+            "ParquetExec: limit=None, partitions=[x]",
+        ];
+
+        assert_eq!(&trim_plan_display(&plan), &expected);
+        Ok(())
+    }
 }
diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
index ddc9ff1..d015e0c 100644
--- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -128,6 +128,12 @@ impl ExecutionPlan for SortPreservingMergeExec {
         Distribution::UnspecifiedDistribution
     }
 
+    fn should_repartition_children(&self) -> bool {
+        // if the children are repartitioned they may no longer remain
+        // sorted
+        false
+    }
+
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         vec![self.input.clone()]
     }