You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/02/04 16:32:40 UTC

[arrow-datafusion] branch alamb/less_repartitioing created (now c99e91e)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a change to branch alamb/less_repartitioing
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git.


      at c99e91e  Do not repartition sorted inputs `SortPreservingMerge`

This branch includes the following new commits:

     new c99e91e  Do not repartition sorted inputs `SortPreservingMerge`

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[arrow-datafusion] 01/01: Do not repartition sorted inputs `SortPreservingMerge`

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch alamb/less_repartitioing
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit c99e91e69aacdfb79d5b7eddf00b3f1597d3e87c
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri Feb 4 11:32:29 2022 -0500

    Do not repartition sorted inputs `SortPreservingMerge`
---
 datafusion/src/physical_optimizer/repartition.rs   | 35 +++++++++++++++++++++-
 .../physical_plan/sorts/sort_preserving_merge.rs   |  6 ++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs
index 1f45053..6f6d678 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -109,11 +109,12 @@ mod tests {
 
     use super::*;
     use crate::datasource::PartitionedFile;
-    use crate::physical_plan::expressions::col;
+    use crate::physical_plan::expressions::{col, PhysicalSortExpr};
     use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use crate::physical_plan::filter::FilterExec;
     use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
     use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+    use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
     use crate::physical_plan::union::UnionExec;
     use crate::physical_plan::{displayable, Statistics};
     use crate::test::object_store::TestObjectStore;
@@ -137,6 +138,17 @@ mod tests {
         ))
     }
 
+    fn sort_preserving_merge_exec(
+        input: Arc<dyn ExecutionPlan>,
+    ) -> Arc<dyn ExecutionPlan> {
+        let expr = vec![PhysicalSortExpr {
+            expr: col("c1", &schema()).unwrap(),
+            options: arrow::compute::SortOptions::default(),
+        }];
+
+        Arc::new(SortPreservingMergeExec::new(expr, input))
+    }
+
     fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
         Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap())
     }
@@ -276,4 +288,25 @@ mod tests {
         assert_eq!(&trim_plan_display(&plan), &expected);
         Ok(())
     }
+
+    #[test]
+    fn repartition_ignores_sort_preserving_merge() -> Result<()> {
+        let optimizer = Repartition {};
+
+        let optimized = optimizer.optimize(
+            sort_preserving_merge_exec(parquet_exec()),
+            &ExecutionConfig::new().with_target_partitions(5),
+        )?;
+
+        let plan = displayable(optimized.as_ref()).indent().to_string();
+
+        let expected = &[
+            "SortPreservingMergeExec: [c1@0 ASC]",
+            // Expect no repartition of SortPreservingMergeExec
+            "ParquetExec: limit=None, partitions=[x]",
+        ];
+
+        assert_eq!(&trim_plan_display(&plan), &expected);
+        Ok(())
+    }
 }
diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
index ddc9ff1..d015e0c 100644
--- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -128,6 +128,12 @@ impl ExecutionPlan for SortPreservingMergeExec {
         Distribution::UnspecifiedDistribution
     }
 
+    fn should_repartition_children(&self) -> bool {
+        // if the children are repartitioned they may no longer remain
+        // sorted
+        false
+    }
+
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         vec![self.input.clone()]
     }