You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/05/12 21:36:11 UTC
[arrow-datafusion] branch main updated: Ordering satisfy consider ordering equivalence of different lengths (#6330)
This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4b21b61559 Ordering satisfy consider ordering equivalence of different lengths (#6330)
4b21b61559 is described below
commit 4b21b61559f2f6511736a7b274f681aaa088b111
Author: Berkay Şahin <12...@users.noreply.github.com>
AuthorDate: Sat May 13 00:36:04 2023 +0300
Ordering satisfy consider ordering equivalence of different lengths (#6330)
* Length check is postponed, and compression step is added.
* Update/Add comments, minor changes
* Various clean-ups and clone removals
* Simplifications
* Simplify code structure
---------
Co-authored-by: Mustafa Akur <mu...@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
.../core/src/physical_optimizer/sort_pushdown.rs | 2 +-
datafusion/core/src/physical_plan/windows/mod.rs | 2 +-
.../core/tests/sqllogictests/test_files/window.slt | 25 ++
datafusion/physical-expr/src/equivalence.rs | 8 +-
datafusion/physical-expr/src/sort_expr.rs | 45 +--
datafusion/physical-expr/src/utils.rs | 382 ++++++++++++---------
6 files changed, 269 insertions(+), 195 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index a0490fa64e..20a5038b7a 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -397,7 +397,7 @@ fn shift_right_required(
let new_right_required: Vec<PhysicalSortRequirement> = parent_required
.iter()
.filter_map(|r| {
- let Some(col) = r.expr().as_any().downcast_ref::<Column>() else {
+ let Some(col) = r.expr.as_any().downcast_ref::<Column>() else {
return None;
};
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index 40ef8a3e6a..f6fe3bcaee 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -212,7 +212,7 @@ pub(crate) fn calc_requirements<
.collect::<Vec<_>>();
for element in orderby_sort_exprs.into_iter() {
let PhysicalSortExpr { expr, options } = element.borrow();
- if !sort_reqs.iter().any(|e| e.expr().eq(expr)) {
+ if !sort_reqs.iter().any(|e| e.expr.eq(expr)) {
sort_reqs.push(PhysicalSortRequirement::new(expr.clone(), Some(*options)));
}
}
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 9790871ff2..02c5e6cdb0 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2379,6 +2379,31 @@ select row_number() over (rows between null preceding and current row) from (sel
statement error DataFusion error: Error during planning: Invalid window frame: frame offsets must be non negative integers
select row_number() over (rows between current row and -1 following) from (select 1 a) x
+# This test shows that ordering satisfy considers ordering equivalences,
+# and can simplify (reduce expression size) multi expression requirements during normalization
+# For the example below, requirement rn1 ASC, c9 DESC should be simplified to the rn1 ASC.
+# Hence in the final plan we shouldn't see SortExec for the global ORDER BY.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+ ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+ FROM aggregate_test_100
+ ORDER BY c9 DESC)
+ ORDER BY rn1, c9 DESC
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+ Sort: rn1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST, fetch=5
+ Sort: aggregate_test_100.c9 DESC NULLS FIRST
+ Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+ WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+ TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+ ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+ BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+ SortExec: expr=[c9@0 DESC]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
# The following query has type error. We should test the error could be detected
# from either the logical plan (when `skip_failed_rules` is set to `false`) or
diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs
index d46f56e46a..ab3443424b 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -52,8 +52,8 @@ impl<T: Eq + Hash + Clone> EquivalenceProperties<T> {
}
}
- /// Add new equal conditions into the EquivalenceProperties, the new equal conditions are usually comming from the
- /// equality predicates in Join or Filter
+ /// Adds new equal conditions into the EquivalenceProperties. New equal
+ /// conditions usually come from equality predicates in a join/filter.
pub fn add_equal_conditions(&mut self, new_conditions: (&T, &T)) {
let mut idx1: Option<usize> = None;
let mut idx2: Option<usize> = None;
@@ -153,7 +153,7 @@ impl<T: Eq + Hash + Clone> EquivalentClass<T> {
}
pub fn insert(&mut self, col: T) -> bool {
- self.others.insert(col)
+ self.head != col && self.others.insert(col)
}
pub fn remove(&mut self, col: &T) -> bool {
@@ -168,7 +168,7 @@ impl<T: Eq + Hash + Clone> EquivalentClass<T> {
false
}
} else {
- true
+ removed
}
}
diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs
index f19ae67004..78cdd87db2 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -100,15 +100,26 @@ impl PhysicalSortExpr {
#[derive(Clone, Debug)]
pub struct PhysicalSortRequirement {
/// Physical expression representing the column to sort
- expr: Arc<dyn PhysicalExpr>,
+ pub expr: Arc<dyn PhysicalExpr>,
/// Option to specify how the given column should be sorted.
/// If unspecified, there are no constraints on sort options.
- options: Option<SortOptions>,
+ pub options: Option<SortOptions>,
}
impl From<PhysicalSortRequirement> for PhysicalSortExpr {
+ /// If options is `None`, the default sort options `ASC, NULLS LAST` is used.
+ ///
+ /// The default is picked to be consistent with
+ /// PostgreSQL: <https://www.postgresql.org/docs/current/queries-order.html>
fn from(value: PhysicalSortRequirement) -> Self {
- value.into_sort_expr()
+ let options = value.options.unwrap_or(SortOptions {
+ descending: false,
+ nulls_first: false,
+ });
+ PhysicalSortExpr {
+ expr: value.expr,
+ options,
+ }
}
}
@@ -151,22 +162,6 @@ impl PhysicalSortRequirement {
self
}
- /// Converts the `PhysicalSortRequirement` to `PhysicalSortExpr`.
- /// If required ordering is `None` for an entry, the default
- /// ordering `ASC, NULLS LAST` is used.
- ///
- /// The default is picked to be consistent with
- /// PostgreSQL: <https://www.postgresql.org/docs/current/queries-order.html>
- pub fn into_sort_expr(self) -> PhysicalSortExpr {
- let Self { expr, options } = self;
-
- let options = options.unwrap_or(SortOptions {
- descending: false,
- nulls_first: false,
- });
- PhysicalSortExpr { expr, options }
- }
-
/// Returns whether this requirement is equal or more specific than `other`.
pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
self.expr.eq(&other.expr)
@@ -197,7 +192,7 @@ impl PhysicalSortRequirement {
///
/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
/// for each entry in the input. If required ordering is None for an entry
- /// default ordering `ASC, NULLS LAST` if given (see [`Self::into_sort_expr`])
+ /// default ordering `ASC, NULLS LAST` if given (see the `PhysicalSortExpr::from`).
pub fn to_sort_exprs(
requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
) -> Vec<PhysicalSortExpr> {
@@ -206,16 +201,6 @@ impl PhysicalSortRequirement {
.map(PhysicalSortExpr::from)
.collect()
}
-
- /// Returns the expr for this requirement
- pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
- &self.expr
- }
-
- /// Returns the required options, for this requirement
- pub fn options(&self) -> Option<SortOptions> {
- self.options
- }
}
/// Returns the SQL string representation of the given [SortOptions] object.
diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs
index 3b186ddf8a..967ecdb40f 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -124,10 +124,7 @@ pub fn normalize_out_expr_with_columns_map(
) -> Arc<dyn PhysicalExpr> {
expr.clone()
.transform(&|expr| {
- let normalized_form: Option<Arc<dyn PhysicalExpr>> = match expr
- .as_any()
- .downcast_ref::<Column>()
- {
+ let normalized_form = match expr.as_any().downcast_ref::<Column>() {
Some(column) => columns_map
.get(column)
.map(|c| Arc::new(c[0].clone()) as _)
@@ -149,7 +146,7 @@ pub fn normalize_expr_with_equivalence_properties(
) -> Arc<dyn PhysicalExpr> {
expr.clone()
.transform(&|expr| {
- let normalized_form: Option<Arc<dyn PhysicalExpr>> =
+ let normalized_form =
expr.as_any().downcast_ref::<Column>().and_then(|column| {
for class in eq_properties {
if class.contains(column) {
@@ -167,56 +164,23 @@ pub fn normalize_expr_with_equivalence_properties(
.unwrap_or(expr)
}
-fn normalize_sort_expr_with_equivalence_properties(
- sort_expr: PhysicalSortExpr,
- eq_properties: &[EquivalentClass],
-) -> PhysicalSortExpr {
- let normalized_expr =
- normalize_expr_with_equivalence_properties(sort_expr.expr.clone(), eq_properties);
-
- if sort_expr.expr.ne(&normalized_expr) {
- PhysicalSortExpr {
- expr: normalized_expr,
- options: sort_expr.options,
- }
- } else {
- sort_expr
- }
-}
-
-fn normalize_sort_requirement_with_equivalence_properties(
- sort_requirement: PhysicalSortRequirement,
- eq_properties: &[EquivalentClass],
-) -> PhysicalSortRequirement {
- let normalized_expr = normalize_expr_with_equivalence_properties(
- sort_requirement.expr().clone(),
- eq_properties,
- );
- if sort_requirement.expr().ne(&normalized_expr) {
- sort_requirement.with_expr(normalized_expr)
- } else {
- sort_requirement
- }
-}
-
fn normalize_expr_with_ordering_equivalence_properties(
expr: Arc<dyn PhysicalExpr>,
- sort_options: Option<SortOptions>,
+ sort_options: SortOptions,
eq_properties: &[OrderingEquivalentClass],
-) -> Arc<dyn PhysicalExpr> {
- expr.clone()
+) -> (Arc<dyn PhysicalExpr>, SortOptions) {
+ let normalized_expr = expr
+ .clone()
.transform(&|expr| {
let normalized_form =
expr.as_any().downcast_ref::<Column>().and_then(|column| {
- if let Some(options) = sort_options {
- for class in eq_properties {
- let ordered_column = OrderedColumn {
- col: column.clone(),
- options,
- };
- if class.contains(&ordered_column) {
- return Some(class.head().clone());
- }
+ for class in eq_properties {
+ let ordered_column = OrderedColumn {
+ col: column.clone(),
+ options: sort_options,
+ };
+ if class.contains(&ordered_column) {
+ return Some(class.head().clone());
}
}
None
@@ -227,47 +191,65 @@ fn normalize_expr_with_ordering_equivalence_properties(
Transformed::No(expr)
})
})
- .unwrap_or(expr)
+ .unwrap_or_else(|_| expr.clone());
+ if expr.ne(&normalized_expr) {
+ if let Some(col) = normalized_expr.as_any().downcast_ref::<Column>() {
+ for eq_class in eq_properties.iter() {
+ let head = eq_class.head();
+ if head.col.eq(col) {
+ // Use options of the normalized version:
+ return (normalized_expr, head.options);
+ }
+ }
+ }
+ }
+ (expr, sort_options)
+}
+
+fn normalize_sort_expr_with_equivalence_properties(
+ mut sort_expr: PhysicalSortExpr,
+ eq_properties: &[EquivalentClass],
+) -> PhysicalSortExpr {
+ sort_expr.expr =
+ normalize_expr_with_equivalence_properties(sort_expr.expr, eq_properties);
+ sort_expr
}
fn normalize_sort_expr_with_ordering_equivalence_properties(
- sort_expr: PhysicalSortExpr,
+ mut sort_expr: PhysicalSortExpr,
eq_properties: &[OrderingEquivalentClass],
) -> PhysicalSortExpr {
- let requirement = normalize_sort_requirement_with_ordering_equivalence_properties(
- PhysicalSortRequirement::from(sort_expr),
- eq_properties,
- );
- requirement.into_sort_expr()
+ (sort_expr.expr, sort_expr.options) =
+ normalize_expr_with_ordering_equivalence_properties(
+ sort_expr.expr.clone(),
+ sort_expr.options,
+ eq_properties,
+ );
+ sort_expr
+}
+
+fn normalize_sort_requirement_with_equivalence_properties(
+ mut sort_requirement: PhysicalSortRequirement,
+ eq_properties: &[EquivalentClass],
+) -> PhysicalSortRequirement {
+ sort_requirement.expr =
+ normalize_expr_with_equivalence_properties(sort_requirement.expr, eq_properties);
+ sort_requirement
}
fn normalize_sort_requirement_with_ordering_equivalence_properties(
- sort_requirement: PhysicalSortRequirement,
+ mut sort_requirement: PhysicalSortRequirement,
eq_properties: &[OrderingEquivalentClass],
) -> PhysicalSortRequirement {
- let normalized_expr = normalize_expr_with_ordering_equivalence_properties(
- sort_requirement.expr().clone(),
- sort_requirement.options(),
- eq_properties,
- );
- if sort_requirement.expr().eq(&normalized_expr) {
- sort_requirement
- } else {
- let mut options = sort_requirement.options();
- if let Some(col) = normalized_expr.as_any().downcast_ref::<Column>() {
- for eq_class in eq_properties.iter() {
- let head = eq_class.head();
- if head.col.eq(col) {
- // If there is a requirement, update it with the requirement of its normalized version.
- if let Some(options) = &mut options {
- *options = head.options;
- }
- break;
- }
- }
- }
- PhysicalSortRequirement::new(normalized_expr, options)
+ if let Some(options) = &mut sort_requirement.options {
+ (sort_requirement.expr, *options) =
+ normalize_expr_with_ordering_equivalence_properties(
+ sort_requirement.expr,
+ *options,
+ eq_properties,
+ );
}
+ sort_requirement
}
pub fn normalize_sort_expr(
@@ -285,16 +267,16 @@ pub fn normalize_sort_expr(
pub fn normalize_sort_requirement(
sort_requirement: PhysicalSortRequirement,
- eq_classes: &[EquivalentClass],
- ordering_eq_classes: &[OrderingEquivalentClass],
+ eq_properties: &[EquivalentClass],
+ ordering_eq_properties: &[OrderingEquivalentClass],
) -> PhysicalSortRequirement {
let normalized = normalize_sort_requirement_with_equivalence_properties(
sort_requirement,
- eq_classes,
+ eq_properties,
);
normalize_sort_requirement_with_ordering_equivalence_properties(
normalized,
- ordering_eq_classes,
+ ordering_eq_properties,
)
}
@@ -331,29 +313,28 @@ fn ordering_satisfy_concrete<
equal_properties: F,
ordering_equal_properties: F2,
) -> bool {
- if required.len() > provided.len() {
- false
- } else if required
+ let oeq_properties = ordering_equal_properties();
+ let ordering_eq_classes = oeq_properties.classes();
+ let eq_properties = equal_properties();
+ let eq_classes = eq_properties.classes();
+ let mut required_normalized = Vec::new();
+ for expr in required {
+ let item = normalize_sort_expr(expr.clone(), eq_classes, ordering_eq_classes);
+ if !required_normalized.contains(&item) {
+ required_normalized.push(item);
+ }
+ }
+ let provided_normalized = provided
.iter()
- .zip(provided.iter())
- .all(|(req, given)| req.eq(given))
- {
- true
- } else {
- let oeq_properties = ordering_equal_properties();
- let ordering_eq_classes = oeq_properties.classes();
- let eq_properties = equal_properties();
- let eq_classes = eq_properties.classes();
- required
- .iter()
- .map(|e| normalize_sort_expr(e.clone(), eq_classes, ordering_eq_classes))
- .zip(
- provided.iter().map(|e| {
- normalize_sort_expr(e.clone(), eq_classes, ordering_eq_classes)
- }),
- )
- .all(|(req, given)| req == given)
+ .map(|e| normalize_sort_expr(e.clone(), eq_classes, ordering_eq_classes))
+ .collect::<Vec<_>>();
+ if required_normalized.len() > provided_normalized.len() {
+ return false;
}
+ required_normalized
+ .into_iter()
+ .zip(provided_normalized)
+ .all(|(req, given)| given == req)
}
/// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the
@@ -390,31 +371,29 @@ pub fn ordering_satisfy_requirement_concrete<
equal_properties: F,
ordering_equal_properties: F2,
) -> bool {
- if required.len() > provided.len() {
- false
- } else if required
+ let oeq_properties = ordering_equal_properties();
+ let ordering_eq_classes = oeq_properties.classes();
+ let eq_properties = equal_properties();
+ let eq_classes = eq_properties.classes();
+ let mut required_normalized = Vec::new();
+ for req in required {
+ let item =
+ normalize_sort_requirement(req.clone(), eq_classes, ordering_eq_classes);
+ if !required_normalized.contains(&item) {
+ required_normalized.push(item);
+ }
+ }
+ let provided_normalized = provided
.iter()
- .zip(provided.iter())
- .all(|(req, given)| given.satisfy(req))
- {
- true
- } else {
- let oeq_properties = ordering_equal_properties();
- let ordering_eq_classes = oeq_properties.classes();
- let eq_properties = equal_properties();
- let eq_classes = eq_properties.classes();
- required
- .iter()
- .map(|e| {
- normalize_sort_requirement(e.clone(), eq_classes, ordering_eq_classes)
- })
- .zip(
- provided.iter().map(|e| {
- normalize_sort_expr(e.clone(), eq_classes, ordering_eq_classes)
- }),
- )
- .all(|(req, given)| given.satisfy(&req))
+ .map(|e| normalize_sort_expr(e.clone(), eq_classes, ordering_eq_classes))
+ .collect::<Vec<_>>();
+ if required_normalized.len() > provided_normalized.len() {
+ return false;
}
+ required_normalized
+ .into_iter()
+ .zip(provided_normalized)
+ .all(|(req, given)| given.satisfy(&req))
}
/// Checks whether the given [`PhysicalSortRequirement`]s are equal or more
@@ -451,29 +430,29 @@ fn requirements_compatible_concrete<
ordering_equal_properties: F,
equal_properties: F2,
) -> bool {
- if required.len() > provided.len() {
- false
- } else if required
+ let oeq_properties = ordering_equal_properties();
+ let ordering_eq_classes = oeq_properties.classes();
+ let eq_properties = equal_properties();
+ let eq_classes = eq_properties.classes();
+ let mut required_normalized = Vec::new();
+ for req in required {
+ let item =
+ normalize_sort_requirement(req.clone(), eq_classes, ordering_eq_classes);
+ if !required_normalized.contains(&item) {
+ required_normalized.push(item);
+ }
+ }
+ let provided_normalized = provided
.iter()
- .zip(provided.iter())
- .all(|(req, given)| given.compatible(req))
- {
- true
- } else {
- let oeq_properties = ordering_equal_properties();
- let ordering_eq_classes = oeq_properties.classes();
- let eq_properties = equal_properties();
- let eq_classes = eq_properties.classes();
- required
- .iter()
- .map(|e| {
- normalize_sort_requirement(e.clone(), eq_classes, ordering_eq_classes)
- })
- .zip(provided.iter().map(|e| {
- normalize_sort_requirement(e.clone(), eq_classes, ordering_eq_classes)
- }))
- .all(|(req, given)| given.compatible(&req))
+ .map(|e| normalize_sort_requirement(e.clone(), eq_classes, ordering_eq_classes))
+ .collect::<Vec<_>>();
+ if required_normalized.len() > provided_normalized.len() {
+ return false;
}
+ required_normalized
+ .into_iter()
+ .zip(provided_normalized)
+ .all(|(req, given)| given.compatible(&req))
}
/// This function maps back requirement after ProjectionExec
@@ -1253,20 +1232,20 @@ mod tests {
// Test cases for ordering equivalence normalization
// First entry in the tuple is PhysicalExpr, second entry is its ordering, third entry is result after normalization.
let expressions = vec![
- (&col_d_expr, option1, &col_a_expr),
- (&col_e_expr, option2, &col_a_expr),
+ (&col_d_expr, option1, option1, &col_a_expr),
+ (&col_e_expr, option2, option1, &col_a_expr),
// Cannot normalize, hence should return itself.
- (&col_e_expr, option1, &col_e_expr),
+ (&col_e_expr, option1, option1, &col_e_expr),
];
- for (expr, sort_options, expected_ordering_eq) in expressions {
+ for (expr, sort_options, expected_options, expected_ordering_eq) in expressions {
+ let (normalized_expr, options) =
+ normalize_expr_with_ordering_equivalence_properties(
+ expr.clone(),
+ sort_options,
+ ordering_eq_properties.classes(),
+ );
assert!(
- expected_ordering_eq.eq(
- &normalize_expr_with_ordering_equivalence_properties(
- expr.clone(),
- Some(sort_options),
- ordering_eq_properties.classes()
- )
- ),
+ normalized_expr.eq(expected_ordering_eq) && (expected_options == options),
"error in test: expr: {expr:?}, sort_options: {sort_options:?}"
);
}
@@ -1427,4 +1406,89 @@ mod tests {
}
Ok(())
}
+
+ #[test]
+ fn test_ordering_satisfy_different_lengths() -> Result<()> {
+ let col_a = &Column::new("a", 0);
+ let col_b = &Column::new("b", 1);
+ let col_c = &Column::new("c", 2);
+ let col_d = &Column::new("d", 3);
+ let col_e = &Column::new("e", 4);
+ let test_schema = create_test_schema()?;
+ let option1 = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ // Column a and c are aliases.
+ let mut eq_properties = EquivalenceProperties::new(test_schema.clone());
+ eq_properties.add_equal_conditions((col_a, col_c));
+
+ // Column a and e are ordering equivalent (e.g global ordering of the table can be described both as a ASC and e ASC.)
+ let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema);
+ ordering_eq_properties.add_equal_conditions((
+ &OrderedColumn::new(col_a.clone(), option1),
+ &OrderedColumn::new(col_e.clone(), option1),
+ ));
+ let sort_req_a = PhysicalSortExpr {
+ expr: Arc::new((col_a).clone()) as _,
+ options: option1,
+ };
+ let sort_req_b = PhysicalSortExpr {
+ expr: Arc::new((col_b).clone()) as _,
+ options: option1,
+ };
+ let sort_req_c = PhysicalSortExpr {
+ expr: Arc::new((col_c).clone()) as _,
+ options: option1,
+ };
+ let sort_req_d = PhysicalSortExpr {
+ expr: Arc::new((col_d).clone()) as _,
+ options: option1,
+ };
+ let sort_req_e = PhysicalSortExpr {
+ expr: Arc::new((col_e).clone()) as _,
+ options: option1,
+ };
+
+ assert!(ordering_satisfy_concrete(
+ // After normalization would be a ASC, b ASC, d ASC
+ &[sort_req_a.clone(), sort_req_b.clone(), sort_req_d.clone()],
+ // After normalization would be a ASC, b ASC, d ASC
+ &[
+ sort_req_c.clone(),
+ sort_req_b.clone(),
+ sort_req_a.clone(),
+ sort_req_d.clone(),
+ sort_req_e.clone(),
+ ],
+ || eq_properties.clone(),
+ || ordering_eq_properties.clone(),
+ ));
+
+ assert!(!ordering_satisfy_concrete(
+ // After normalization would be a ASC, b ASC
+ &[sort_req_a.clone(), sort_req_b.clone()],
+ // After normalization would be a ASC, b ASC, d ASC
+ &[
+ sort_req_c.clone(),
+ sort_req_b.clone(),
+ sort_req_a.clone(),
+ sort_req_d.clone(),
+ sort_req_e.clone(),
+ ],
+ || eq_properties.clone(),
+ || ordering_eq_properties.clone(),
+ ));
+
+ assert!(!ordering_satisfy_concrete(
+ // After normalization would be a ASC, b ASC, d ASC
+ &[sort_req_a.clone(), sort_req_b.clone(), sort_req_d.clone()],
+ // After normalization would be a ASC, d ASC, b ASC
+ &[sort_req_c, sort_req_d, sort_req_a, sort_req_b, sort_req_e,],
+ || eq_properties.clone(),
+ || ordering_eq_properties.clone(),
+ ));
+
+ Ok(())
+ }
}