You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/31 14:47:43 UTC

[arrow-datafusion] branch master updated: Improve `get_meet_of_orderings` to check for common prefixes (#5111)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f7885bb4 Improve `get_meet_of_orderings` to check for common prefixes (#5111)
1f7885bb4 is described below

commit 1f7885bb48dd33ce7b9df995214393bbff080e08
Author: Mehmet Ozan Kabak <oz...@gmail.com>
AuthorDate: Tue Jan 31 09:47:36 2023 -0500

    Improve `get_meet_of_orderings` to check for common prefixes (#5111)
    
    * Improve get_meet_of_orderings to check for common prefixes among the given orderings
    
    * Incorporate review suggestions
---
 datafusion/core/src/physical_plan/common.rs | 197 ++++++++++++++++++++++++----
 datafusion/core/src/physical_plan/union.rs  |  21 +--
 2 files changed, 185 insertions(+), 33 deletions(-)

diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index 2515689e1..f7acfaed6 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -27,7 +27,6 @@ use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::ArrowError;
 use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
 use arrow::record_batch::RecordBatch;
-use datafusion_physical_expr::utils::ordering_satisfy;
 use datafusion_physical_expr::PhysicalSortExpr;
 use futures::{Future, Stream, StreamExt, TryStreamExt};
 use log::debug;
@@ -321,34 +320,37 @@ pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
     }
 }
 
-/// Calculates the "meet" of children orderings
-/// The meet is the finest ordering that satisfied by all the input
+/// Calculates the "meet" of given orderings.
+/// The meet is the finest ordering that satisfied by all the given
 /// orderings, see https://en.wikipedia.org/wiki/Join_and_meet.
 pub fn get_meet_of_orderings(
-    children: &[Arc<dyn ExecutionPlan>],
+    given: &[Arc<dyn ExecutionPlan>],
 ) -> Option<&[PhysicalSortExpr]> {
-    // To find the meet, we first find the smallest input ordering.
-    let mut smallest: Option<&[PhysicalSortExpr]> = None;
-    for item in children.iter() {
-        if let Some(ordering) = item.output_ordering() {
-            smallest = match smallest {
-                None => Some(ordering),
-                Some(expr) if ordering.len() < expr.len() => Some(ordering),
-                _ => continue,
+    given
+        .iter()
+        .map(|item| item.output_ordering())
+        .collect::<Option<Vec<_>>>()
+        .and_then(get_meet_of_orderings_helper)
+}
+
+fn get_meet_of_orderings_helper(
+    orderings: Vec<&[PhysicalSortExpr]>,
+) -> Option<&[PhysicalSortExpr]> {
+    let mut idx = 0;
+    let first = orderings[0];
+    loop {
+        for ordering in orderings.iter() {
+            if idx >= ordering.len() {
+                return Some(ordering);
+            } else if ordering[idx] != first[idx] {
+                return if idx > 0 {
+                    Some(&ordering[..idx])
+                } else {
+                    None
+                };
             }
-        } else {
-            return None;
         }
-    }
-    // Check if the smallest ordering is a meet or not:
-    if children.iter().all(|child| {
-        ordering_satisfy(child.output_ordering(), smallest, || {
-            child.equivalence_properties()
-        })
-    }) {
-        smallest
-    } else {
-        None
+        idx += 1;
     }
 }
 
@@ -365,7 +367,152 @@ mod tests {
         datatypes::{DataType, Field, Schema},
         record_batch::RecordBatch,
     };
-    use datafusion_physical_expr::expressions::col;
+    use datafusion_physical_expr::expressions::{col, Column};
+
+    #[test]
+    fn get_meet_of_orderings_helper_common_prefix_test() -> Result<()> {
+        let input1: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("b", 1)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("c", 2)),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let input2: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("b", 1)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("y", 2)),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let input3: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("x", 1)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("y", 2)),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let expected = vec![PhysicalSortExpr {
+            expr: Arc::new(Column::new("a", 0)),
+            options: SortOptions::default(),
+        }];
+
+        let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]);
+        assert_eq!(result.unwrap(), expected);
+        Ok(())
+    }
+
+    #[test]
+    fn get_meet_of_orderings_helper_subset_test() -> Result<()> {
+        let input1: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("b", 1)),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let input2: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("b", 1)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("c", 2)),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let input3: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("b", 1)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("d", 2)),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]);
+        assert_eq!(result.unwrap(), input1);
+        Ok(())
+    }
+
+    #[test]
+    fn get_meet_of_orderings_helper_no_overlap_test() -> Result<()> {
+        let input1: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("b", 1)),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let input2: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("x", 0)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 1)),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let input3: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("y", 1)),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]);
+        assert!(result.is_none());
+        Ok(())
+    }
 
     #[test]
     fn test_meet_of_orderings() -> Result<()> {
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index 2f665e79c..3d4522272 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -47,7 +47,6 @@ use crate::{
     error::Result,
     physical_plan::{expressions, metrics::BaselineMetrics},
 };
-use datafusion_physical_expr::utils::ordering_satisfy;
 use tokio::macros::support::thread_rng_n;
 
 /// `UnionExec`: `UNION ALL` execution plan.
@@ -239,14 +238,20 @@ impl ExecutionPlan for UnionExec {
         // which is the "meet" of all input orderings. In this example, this
         // function will return vec![false, true, true], indicating that we
         // preserve the orderings for the 2nd and the 3rd children.
-        self.inputs()
-            .iter()
-            .map(|child| {
-                ordering_satisfy(self.output_ordering(), child.output_ordering(), || {
-                    child.equivalence_properties()
+        if let Some(output_ordering) = self.output_ordering() {
+            self.inputs()
+                .iter()
+                .map(|child| {
+                    if let Some(child_ordering) = child.output_ordering() {
+                        output_ordering.len() == child_ordering.len()
+                    } else {
+                        false
+                    }
                 })
-            })
-            .collect()
+                .collect()
+        } else {
+            vec![false; self.inputs().len()]
+        }
     }
 
     fn with_new_children(