You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/17 18:23:59 UTC
[arrow-datafusion] branch master updated: Do not resort inputs to `UnionExec` if they are already sorted (#4946)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 279440b2a Do not resort inputs to `UnionExec` if they are already sorted (#4946)
279440b2a is described below
commit 279440b2ab92d18675b8102e342d4d82182287dc
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Jan 17 19:23:53 2023 +0100
Do not resort inputs to `UnionExec` if they are already sorted (#4946)
* Do not resort inputs to Union if they are already sorted
* Remove debugging
---
.../src/physical_optimizer/sort_enforcement.rs | 80 +++++++++++++++++++++-
datafusion/core/src/physical_plan/union.rs | 24 +++++++
2 files changed, 103 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 2d785e920..703a13a1c 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -486,15 +486,19 @@ fn check_alignment(
#[cfg(test)]
mod tests {
use super::*;
+ use crate::datasource::listing::PartitionedFile;
+ use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::displayable;
+ use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+ use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::create_window_expr;
use crate::prelude::SessionContext;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
- use datafusion_common::Result;
+ use datafusion_common::{Result, Statistics};
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
use datafusion_physical_expr::expressions::{col, NotExpr};
use datafusion_physical_expr::PhysicalSortExpr;
@@ -813,6 +817,33 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_union_inputs_sorted() -> Result<()> {
+ let schema = create_test_schema()?;
+
+ let source1 = parquet_exec(&schema);
+ let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+ let sort = sort_exec(sort_exprs.clone(), source1);
+
+ let source2 = parquet_exec_sorted(&schema, sort_exprs.clone());
+
+ let union = union_exec(vec![source2, sort]);
+ let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
+
+ // one input to the union is already sorted, one is not.
+ let expected_input = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC]",
+ " UnionExec",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+ " SortExec: [nullable_col@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+ ];
+ // should not add a sort at the output of the union, input plan should not be changed
+ let expected_optimized = expected_input.clone();
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
/// make PhysicalSortExpr with default options
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
sort_expr_options(name, schema, SortOptions::default())
@@ -856,4 +887,51 @@ mod tests {
) -> Arc<dyn ExecutionPlan> {
Arc::new(FilterExec::try_new(predicate, input).unwrap())
}
+
+ /// Create a non sorted parquet exec
+ fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
+ Arc::new(ParquetExec::new(
+ FileScanConfig {
+ object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+ file_schema: schema.clone(),
+ file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
+ statistics: Statistics::default(),
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ output_ordering: None,
+ infinite_source: false,
+ },
+ None,
+ None,
+ ))
+ }
+
+ // Created a sorted parquet exec
+ fn parquet_exec_sorted(
+ schema: &SchemaRef,
+ sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+ ) -> Arc<ParquetExec> {
+ let sort_exprs = sort_exprs.into_iter().collect();
+
+ Arc::new(ParquetExec::new(
+ FileScanConfig {
+ object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+ file_schema: schema.clone(),
+ file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
+ statistics: Statistics::default(),
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ output_ordering: Some(sort_exprs),
+ infinite_source: false,
+ },
+ None,
+ None,
+ ))
+ }
+
+ fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> {
+ Arc::new(UnionExec::new(input))
+ }
}
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index 921a0d99f..a0fca8066 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -247,6 +247,30 @@ impl ExecutionPlan for UnionExec {
}
}
+ fn maintains_input_order(&self) -> bool {
+ let first_input_ordering = self.inputs[0].output_ordering();
+ // If the Union is not partition aware and all the input
+ // ordering spec strictly equal with the first_input_ordering,
+ // then the `UnionExec` maintains the input order
+ //
+ // It might be too strict here in the case that the input
+ // ordering are compatible but not exactly the same. See
+ // comments in output_ordering
+ !self.partition_aware
+ && first_input_ordering.is_some()
+ && self
+ .inputs
+ .iter()
+ .map(|plan| plan.output_ordering())
+ .all(|ordering| {
+ ordering.is_some()
+ && sort_expr_list_eq_strict_order(
+ ordering.unwrap(),
+ first_input_ordering.unwrap(),
+ )
+ })
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,