You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/31 14:47:43 UTC
[arrow-datafusion] branch master updated: Improve `get_meet_of_orderings` to check for common prefixes (#5111)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 1f7885bb4 Improve `get_meet_of_orderings` to check for common prefixes (#5111)
1f7885bb4 is described below
commit 1f7885bb48dd33ce7b9df995214393bbff080e08
Author: Mehmet Ozan Kabak <oz...@gmail.com>
AuthorDate: Tue Jan 31 09:47:36 2023 -0500
Improve `get_meet_of_orderings` to check for common prefixes (#5111)
* Improve get_meet_of_orderings to check for common prefixes among the given orderings
* Incorporate review suggestions
---
datafusion/core/src/physical_plan/common.rs | 197 ++++++++++++++++++++++++----
datafusion/core/src/physical_plan/union.rs | 21 +--
2 files changed, 185 insertions(+), 33 deletions(-)
diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index 2515689e1..f7acfaed6 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -27,7 +27,6 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
-use datafusion_physical_expr::utils::ordering_satisfy;
use datafusion_physical_expr::PhysicalSortExpr;
use futures::{Future, Stream, StreamExt, TryStreamExt};
use log::debug;
@@ -321,34 +320,37 @@ pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
}
}
-/// Calculates the "meet" of children orderings
-/// The meet is the finest ordering that satisfied by all the input
+/// Calculates the "meet" of given orderings.
+/// The meet is the finest ordering that satisfied by all the given
/// orderings, see https://en.wikipedia.org/wiki/Join_and_meet.
pub fn get_meet_of_orderings(
- children: &[Arc<dyn ExecutionPlan>],
+ given: &[Arc<dyn ExecutionPlan>],
) -> Option<&[PhysicalSortExpr]> {
- // To find the meet, we first find the smallest input ordering.
- let mut smallest: Option<&[PhysicalSortExpr]> = None;
- for item in children.iter() {
- if let Some(ordering) = item.output_ordering() {
- smallest = match smallest {
- None => Some(ordering),
- Some(expr) if ordering.len() < expr.len() => Some(ordering),
- _ => continue,
+ given
+ .iter()
+ .map(|item| item.output_ordering())
+ .collect::<Option<Vec<_>>>()
+ .and_then(get_meet_of_orderings_helper)
+}
+
+fn get_meet_of_orderings_helper(
+ orderings: Vec<&[PhysicalSortExpr]>,
+) -> Option<&[PhysicalSortExpr]> {
+ let mut idx = 0;
+ let first = orderings[0];
+ loop {
+ for ordering in orderings.iter() {
+ if idx >= ordering.len() {
+ return Some(ordering);
+ } else if ordering[idx] != first[idx] {
+ return if idx > 0 {
+ Some(&ordering[..idx])
+ } else {
+ None
+ };
}
- } else {
- return None;
}
- }
- // Check if the smallest ordering is a meet or not:
- if children.iter().all(|child| {
- ordering_satisfy(child.output_ordering(), smallest, || {
- child.equivalence_properties()
- })
- }) {
- smallest
- } else {
- None
+ idx += 1;
}
}
@@ -365,7 +367,152 @@ mod tests {
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
- use datafusion_physical_expr::expressions::col;
+ use datafusion_physical_expr::expressions::{col, Column};
+
+ #[test]
+ fn get_meet_of_orderings_helper_common_prefix_test() -> Result<()> {
+ let input1: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ options: SortOptions::default(),
+ },
+ ];
+
+ let input2: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("y", 2)),
+ options: SortOptions::default(),
+ },
+ ];
+
+ let input3: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("x", 1)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("y", 2)),
+ options: SortOptions::default(),
+ },
+ ];
+
+ let expected = vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ }];
+
+ let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]);
+ assert_eq!(result.unwrap(), expected);
+ Ok(())
+ }
+
+ #[test]
+ fn get_meet_of_orderings_helper_subset_test() -> Result<()> {
+ let input1: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: SortOptions::default(),
+ },
+ ];
+
+ let input2: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ options: SortOptions::default(),
+ },
+ ];
+
+ let input3: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("d", 2)),
+ options: SortOptions::default(),
+ },
+ ];
+
+ let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]);
+ assert_eq!(result.unwrap(), input1);
+ Ok(())
+ }
+
+ #[test]
+ fn get_meet_of_orderings_helper_no_overlap_test() -> Result<()> {
+ let input1: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: SortOptions::default(),
+ },
+ ];
+
+ let input2: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("x", 0)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 1)),
+ options: SortOptions::default(),
+ },
+ ];
+
+ let input3: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("y", 1)),
+ options: SortOptions::default(),
+ },
+ ];
+
+ let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]);
+ assert!(result.is_none());
+ Ok(())
+ }
#[test]
fn test_meet_of_orderings() -> Result<()> {
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index 2f665e79c..3d4522272 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -47,7 +47,6 @@ use crate::{
error::Result,
physical_plan::{expressions, metrics::BaselineMetrics},
};
-use datafusion_physical_expr::utils::ordering_satisfy;
use tokio::macros::support::thread_rng_n;
/// `UnionExec`: `UNION ALL` execution plan.
@@ -239,14 +238,20 @@ impl ExecutionPlan for UnionExec {
// which is the "meet" of all input orderings. In this example, this
// function will return vec![false, true, true], indicating that we
// preserve the orderings for the 2nd and the 3rd children.
- self.inputs()
- .iter()
- .map(|child| {
- ordering_satisfy(self.output_ordering(), child.output_ordering(), || {
- child.equivalence_properties()
+ if let Some(output_ordering) = self.output_ordering() {
+ self.inputs()
+ .iter()
+ .map(|child| {
+ if let Some(child_ordering) = child.output_ordering() {
+ output_ordering.len() == child_ordering.len()
+ } else {
+ false
+ }
})
- })
- .collect()
+ .collect()
+ } else {
+ vec![false; self.inputs().len()]
+ }
}
fn with_new_children(