You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/02/04 16:32:41 UTC
[arrow-datafusion] 01/01: Do not repartition sorted inputs `SortPreservingMerge`
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch alamb/less_repartitioing
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit c99e91e69aacdfb79d5b7eddf00b3f1597d3e87c
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri Feb 4 11:32:29 2022 -0500
Do not repartition sorted inputs `SortPreservingMerge`
---
datafusion/src/physical_optimizer/repartition.rs | 35 +++++++++++++++++++++-
.../physical_plan/sorts/sort_preserving_merge.rs | 6 ++++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs
index 1f45053..6f6d678 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -109,11 +109,12 @@ mod tests {
use super::*;
use crate::datasource::PartitionedFile;
- use crate::physical_plan::expressions::col;
+ use crate::physical_plan::expressions::{col, PhysicalSortExpr};
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::{displayable, Statistics};
use crate::test::object_store::TestObjectStore;
@@ -137,6 +138,17 @@ mod tests {
))
}
+ fn sort_preserving_merge_exec(
+ input: Arc<dyn ExecutionPlan>,
+ ) -> Arc<dyn ExecutionPlan> {
+ let expr = vec![PhysicalSortExpr {
+ expr: col("c1", &schema()).unwrap(),
+ options: arrow::compute::SortOptions::default(),
+ }];
+
+ Arc::new(SortPreservingMergeExec::new(expr, input))
+ }
+
fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap())
}
@@ -276,4 +288,25 @@ mod tests {
assert_eq!(&trim_plan_display(&plan), &expected);
Ok(())
}
+
+ #[test]
+ fn repartition_ignores_sort_preserving_merge() -> Result<()> {
+ let optimizer = Repartition {};
+
+ let optimized = optimizer.optimize(
+ sort_preserving_merge_exec(parquet_exec()),
+ &ExecutionConfig::new().with_target_partitions(5),
+ )?;
+
+ let plan = displayable(optimized.as_ref()).indent().to_string();
+
+ let expected = &[
+ "SortPreservingMergeExec: [c1@0 ASC]",
+ // Expect no repartition of SortPreservingMergeExec
+ "ParquetExec: limit=None, partitions=[x]",
+ ];
+
+ assert_eq!(&trim_plan_display(&plan), &expected);
+ Ok(())
+ }
}
diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
index ddc9ff1..d015e0c 100644
--- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -128,6 +128,12 @@ impl ExecutionPlan for SortPreservingMergeExec {
Distribution::UnspecifiedDistribution
}
+ fn should_repartition_children(&self) -> bool {
+ // if the children are repartitioned they may no longer remain
+ // sorted
+ false
+ }
+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}