You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/17 18:23:59 UTC

[arrow-datafusion] branch master updated: Do not resort inputs to `UnionExec` if they are already sorted (#4946)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 279440b2a Do not resort inputs to `UnionExec` if they are already sorted (#4946)
279440b2a is described below

commit 279440b2ab92d18675b8102e342d4d82182287dc
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Jan 17 19:23:53 2023 +0100

    Do not resort inputs to `UnionExec` if they are already sorted (#4946)
    
    * Do not resort inputs to Union if they are already sorted
    
    * Remove debugging
---
 .../src/physical_optimizer/sort_enforcement.rs     | 80 +++++++++++++++++++++-
 datafusion/core/src/physical_plan/union.rs         | 24 +++++++
 2 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 2d785e920..703a13a1c 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -486,15 +486,19 @@ fn check_alignment(
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::datasource::listing::PartitionedFile;
+    use crate::datasource::object_store::ObjectStoreUrl;
     use crate::physical_plan::displayable;
+    use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
     use crate::physical_plan::filter::FilterExec;
     use crate::physical_plan::memory::MemoryExec;
     use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+    use crate::physical_plan::union::UnionExec;
     use crate::physical_plan::windows::create_window_expr;
     use crate::prelude::SessionContext;
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-    use datafusion_common::Result;
+    use datafusion_common::{Result, Statistics};
     use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
     use datafusion_physical_expr::expressions::{col, NotExpr};
     use datafusion_physical_expr::PhysicalSortExpr;
@@ -813,6 +817,33 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_union_inputs_sorted() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source1);
+
+        let source2 = parquet_exec_sorted(&schema, sort_exprs.clone());
+
+        let union = union_exec(vec![source2, sort]);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
+
+        // one input to the union is already sorted, one is not.
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+        // should not add a sort at the output of the union, input plan should not be changed
+        let expected_optimized = expected_input.clone();
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
     /// make PhysicalSortExpr with default options
     fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
         sort_expr_options(name, schema, SortOptions::default())
@@ -856,4 +887,51 @@ mod tests {
     ) -> Arc<dyn ExecutionPlan> {
         Arc::new(FilterExec::try_new(predicate, input).unwrap())
     }
+
+    /// Create a non sorted parquet exec
+    fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
+        Arc::new(ParquetExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+                file_schema: schema.clone(),
+                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: None,
+                infinite_source: false,
+            },
+            None,
+            None,
+        ))
+    }
+
+    // Created a sorted parquet exec
+    fn parquet_exec_sorted(
+        schema: &SchemaRef,
+        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+    ) -> Arc<ParquetExec> {
+        let sort_exprs = sort_exprs.into_iter().collect();
+
+        Arc::new(ParquetExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+                file_schema: schema.clone(),
+                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: Some(sort_exprs),
+                infinite_source: false,
+            },
+            None,
+            None,
+        ))
+    }
+
+    fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> {
+        Arc::new(UnionExec::new(input))
+    }
 }
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index 921a0d99f..a0fca8066 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -247,6 +247,30 @@ impl ExecutionPlan for UnionExec {
         }
     }
 
+    fn maintains_input_order(&self) -> bool {
+        let first_input_ordering = self.inputs[0].output_ordering();
+        // If the Union is not partition aware and all the input
+        // ordering spec strictly equal with the first_input_ordering,
+        // then the `UnionExec` maintains the input order
+        //
+        // It might be too strict here in the case that the input
+        // ordering are compatible but not exactly the same.  See
+        // comments in output_ordering
+        !self.partition_aware
+            && first_input_ordering.is_some()
+            && self
+                .inputs
+                .iter()
+                .map(|plan| plan.output_ordering())
+                .all(|ordering| {
+                    ordering.is_some()
+                        && sort_expr_list_eq_strict_order(
+                            ordering.unwrap(),
+                            first_input_ordering.unwrap(),
+                        )
+                })
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,