You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/05/12 21:36:11 UTC

[arrow-datafusion] branch main updated: Ordering satisfy consider ordering equivalence of different lengths (#6330)

This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b21b61559 Ordering satisfy consider ordering equivalence of different lengths (#6330)
4b21b61559 is described below

commit 4b21b61559f2f6511736a7b274f681aaa088b111
Author: Berkay Şahin <12...@users.noreply.github.com>
AuthorDate: Sat May 13 00:36:04 2023 +0300

    Ordering satisfy consider ordering equivalence of different lengths (#6330)
    
    * Length check is postponed, and compression step is added.
    
    * Update/Add comments, minor changes
    
    * Various clean-ups and clone removals
    
    * Simplifications
    
    * Simplify code structure
    
    ---------
    
    Co-authored-by: Mustafa Akur <mu...@synnada.ai>
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 .../core/src/physical_optimizer/sort_pushdown.rs   |   2 +-
 datafusion/core/src/physical_plan/windows/mod.rs   |   2 +-
 .../core/tests/sqllogictests/test_files/window.slt |  25 ++
 datafusion/physical-expr/src/equivalence.rs        |   8 +-
 datafusion/physical-expr/src/sort_expr.rs          |  45 +--
 datafusion/physical-expr/src/utils.rs              | 382 ++++++++++++---------
 6 files changed, 269 insertions(+), 195 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index a0490fa64e..20a5038b7a 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -397,7 +397,7 @@ fn shift_right_required(
     let new_right_required: Vec<PhysicalSortRequirement> = parent_required
         .iter()
         .filter_map(|r| {
-            let Some(col) = r.expr().as_any().downcast_ref::<Column>() else {
+            let Some(col) = r.expr.as_any().downcast_ref::<Column>() else {
                 return None;
             };
 
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index 40ef8a3e6a..f6fe3bcaee 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -212,7 +212,7 @@ pub(crate) fn calc_requirements<
         .collect::<Vec<_>>();
     for element in orderby_sort_exprs.into_iter() {
         let PhysicalSortExpr { expr, options } = element.borrow();
-        if !sort_reqs.iter().any(|e| e.expr().eq(expr)) {
+        if !sort_reqs.iter().any(|e| e.expr.eq(expr)) {
             sort_reqs.push(PhysicalSortRequirement::new(expr.clone(), Some(*options)));
         }
     }
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 9790871ff2..02c5e6cdb0 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2379,6 +2379,31 @@ select row_number() over (rows between null preceding and current row) from (sel
 statement error DataFusion error: Error during planning: Invalid window frame: frame offsets must be non negative integers 
 select row_number() over (rows between current row and -1 following) from (select 1 a) x
 
+# This test shows that ordering satisfy considers ordering equivalences,
+# and can simplify (reduce expression size) multi expression requirements during normalization
+# For the example below, requirement rn1 ASC, c9 DESC should be simplified to the rn1 ASC.
+# Hence in the final plan we shouldn't see SortExec for the global ORDER BY.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                       ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                       FROM aggregate_test_100
+                       ORDER BY c9 DESC)
+       ORDER BY rn1, c9 DESC
+       LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST, fetch=5
+    Sort: aggregate_test_100.c9 DESC NULLS FIRST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+      SortExec: expr=[c9@0 DESC]
+        CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
 # The following query has type error. We should test the error could be detected
 # from either the logical plan (when `skip_failed_rules` is set to `false`) or
diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs
index d46f56e46a..ab3443424b 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -52,8 +52,8 @@ impl<T: Eq + Hash + Clone> EquivalenceProperties<T> {
         }
     }
 
-    /// Add new equal conditions into the EquivalenceProperties, the new equal conditions are usually comming from the
-    /// equality predicates in Join or Filter
+    /// Adds new equal conditions into the EquivalenceProperties. New equal
+    /// conditions usually come from equality predicates in a join/filter.
     pub fn add_equal_conditions(&mut self, new_conditions: (&T, &T)) {
         let mut idx1: Option<usize> = None;
         let mut idx2: Option<usize> = None;
@@ -153,7 +153,7 @@ impl<T: Eq + Hash + Clone> EquivalentClass<T> {
     }
 
     pub fn insert(&mut self, col: T) -> bool {
-        self.others.insert(col)
+        self.head != col && self.others.insert(col)
     }
 
     pub fn remove(&mut self, col: &T) -> bool {
@@ -168,7 +168,7 @@ impl<T: Eq + Hash + Clone> EquivalentClass<T> {
                 false
             }
         } else {
-            true
+            removed
         }
     }
 
diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs
index f19ae67004..78cdd87db2 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -100,15 +100,26 @@ impl PhysicalSortExpr {
 #[derive(Clone, Debug)]
 pub struct PhysicalSortRequirement {
     /// Physical expression representing the column to sort
-    expr: Arc<dyn PhysicalExpr>,
+    pub expr: Arc<dyn PhysicalExpr>,
     /// Option to specify how the given column should be sorted.
     /// If unspecified, there are no constraints on sort options.
-    options: Option<SortOptions>,
+    pub options: Option<SortOptions>,
 }
 
 impl From<PhysicalSortRequirement> for PhysicalSortExpr {
+    /// If options is `None`, the default sort options `ASC, NULLS LAST` is used.
+    ///
+    /// The default is picked to be consistent with
+    /// PostgreSQL: <https://www.postgresql.org/docs/current/queries-order.html>    
     fn from(value: PhysicalSortRequirement) -> Self {
-        value.into_sort_expr()
+        let options = value.options.unwrap_or(SortOptions {
+            descending: false,
+            nulls_first: false,
+        });
+        PhysicalSortExpr {
+            expr: value.expr,
+            options,
+        }
     }
 }
 
@@ -151,22 +162,6 @@ impl PhysicalSortRequirement {
         self
     }
 
-    /// Converts the `PhysicalSortRequirement` to `PhysicalSortExpr`.
-    /// If required ordering is `None` for an entry, the default
-    /// ordering `ASC, NULLS LAST` is used.
-    ///
-    /// The default is picked to be consistent with
-    /// PostgreSQL: <https://www.postgresql.org/docs/current/queries-order.html>
-    pub fn into_sort_expr(self) -> PhysicalSortExpr {
-        let Self { expr, options } = self;
-
-        let options = options.unwrap_or(SortOptions {
-            descending: false,
-            nulls_first: false,
-        });
-        PhysicalSortExpr { expr, options }
-    }
-
     /// Returns whether this requirement is equal or more specific than `other`.
     pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
         self.expr.eq(&other.expr)
@@ -197,7 +192,7 @@ impl PhysicalSortRequirement {
     ///
     /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
     /// for each entry in the input. If required ordering is None for an entry
-    /// default ordering `ASC, NULLS LAST` if given (see [`Self::into_sort_expr`])
+    /// default ordering `ASC, NULLS LAST` if given (see the `PhysicalSortExpr::from`).
     pub fn to_sort_exprs(
         requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
     ) -> Vec<PhysicalSortExpr> {
@@ -206,16 +201,6 @@ impl PhysicalSortRequirement {
             .map(PhysicalSortExpr::from)
             .collect()
     }
-
-    /// Returns the expr for this requirement
-    pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
-        &self.expr
-    }
-
-    /// Returns the required options, for this requirement
-    pub fn options(&self) -> Option<SortOptions> {
-        self.options
-    }
 }
 
 /// Returns the SQL string representation of the given [SortOptions] object.
diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs
index 3b186ddf8a..967ecdb40f 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -124,10 +124,7 @@ pub fn normalize_out_expr_with_columns_map(
 ) -> Arc<dyn PhysicalExpr> {
     expr.clone()
         .transform(&|expr| {
-            let normalized_form: Option<Arc<dyn PhysicalExpr>> = match expr
-                .as_any()
-                .downcast_ref::<Column>()
-            {
+            let normalized_form = match expr.as_any().downcast_ref::<Column>() {
                 Some(column) => columns_map
                     .get(column)
                     .map(|c| Arc::new(c[0].clone()) as _)
@@ -149,7 +146,7 @@ pub fn normalize_expr_with_equivalence_properties(
 ) -> Arc<dyn PhysicalExpr> {
     expr.clone()
         .transform(&|expr| {
-            let normalized_form: Option<Arc<dyn PhysicalExpr>> =
+            let normalized_form =
                 expr.as_any().downcast_ref::<Column>().and_then(|column| {
                     for class in eq_properties {
                         if class.contains(column) {
@@ -167,56 +164,23 @@ pub fn normalize_expr_with_equivalence_properties(
         .unwrap_or(expr)
 }
 
-fn normalize_sort_expr_with_equivalence_properties(
-    sort_expr: PhysicalSortExpr,
-    eq_properties: &[EquivalentClass],
-) -> PhysicalSortExpr {
-    let normalized_expr =
-        normalize_expr_with_equivalence_properties(sort_expr.expr.clone(), eq_properties);
-
-    if sort_expr.expr.ne(&normalized_expr) {
-        PhysicalSortExpr {
-            expr: normalized_expr,
-            options: sort_expr.options,
-        }
-    } else {
-        sort_expr
-    }
-}
-
-fn normalize_sort_requirement_with_equivalence_properties(
-    sort_requirement: PhysicalSortRequirement,
-    eq_properties: &[EquivalentClass],
-) -> PhysicalSortRequirement {
-    let normalized_expr = normalize_expr_with_equivalence_properties(
-        sort_requirement.expr().clone(),
-        eq_properties,
-    );
-    if sort_requirement.expr().ne(&normalized_expr) {
-        sort_requirement.with_expr(normalized_expr)
-    } else {
-        sort_requirement
-    }
-}
-
 fn normalize_expr_with_ordering_equivalence_properties(
     expr: Arc<dyn PhysicalExpr>,
-    sort_options: Option<SortOptions>,
+    sort_options: SortOptions,
     eq_properties: &[OrderingEquivalentClass],
-) -> Arc<dyn PhysicalExpr> {
-    expr.clone()
+) -> (Arc<dyn PhysicalExpr>, SortOptions) {
+    let normalized_expr = expr
+        .clone()
         .transform(&|expr| {
             let normalized_form =
                 expr.as_any().downcast_ref::<Column>().and_then(|column| {
-                    if let Some(options) = sort_options {
-                        for class in eq_properties {
-                            let ordered_column = OrderedColumn {
-                                col: column.clone(),
-                                options,
-                            };
-                            if class.contains(&ordered_column) {
-                                return Some(class.head().clone());
-                            }
+                    for class in eq_properties {
+                        let ordered_column = OrderedColumn {
+                            col: column.clone(),
+                            options: sort_options,
+                        };
+                        if class.contains(&ordered_column) {
+                            return Some(class.head().clone());
                         }
                     }
                     None
@@ -227,47 +191,65 @@ fn normalize_expr_with_ordering_equivalence_properties(
                 Transformed::No(expr)
             })
         })
-        .unwrap_or(expr)
+        .unwrap_or_else(|_| expr.clone());
+    if expr.ne(&normalized_expr) {
+        if let Some(col) = normalized_expr.as_any().downcast_ref::<Column>() {
+            for eq_class in eq_properties.iter() {
+                let head = eq_class.head();
+                if head.col.eq(col) {
+                    // Use options of the normalized version:
+                    return (normalized_expr, head.options);
+                }
+            }
+        }
+    }
+    (expr, sort_options)
+}
+
+fn normalize_sort_expr_with_equivalence_properties(
+    mut sort_expr: PhysicalSortExpr,
+    eq_properties: &[EquivalentClass],
+) -> PhysicalSortExpr {
+    sort_expr.expr =
+        normalize_expr_with_equivalence_properties(sort_expr.expr, eq_properties);
+    sort_expr
 }
 
 fn normalize_sort_expr_with_ordering_equivalence_properties(
-    sort_expr: PhysicalSortExpr,
+    mut sort_expr: PhysicalSortExpr,
     eq_properties: &[OrderingEquivalentClass],
 ) -> PhysicalSortExpr {
-    let requirement = normalize_sort_requirement_with_ordering_equivalence_properties(
-        PhysicalSortRequirement::from(sort_expr),
-        eq_properties,
-    );
-    requirement.into_sort_expr()
+    (sort_expr.expr, sort_expr.options) =
+        normalize_expr_with_ordering_equivalence_properties(
+            sort_expr.expr.clone(),
+            sort_expr.options,
+            eq_properties,
+        );
+    sort_expr
+}
+
+fn normalize_sort_requirement_with_equivalence_properties(
+    mut sort_requirement: PhysicalSortRequirement,
+    eq_properties: &[EquivalentClass],
+) -> PhysicalSortRequirement {
+    sort_requirement.expr =
+        normalize_expr_with_equivalence_properties(sort_requirement.expr, eq_properties);
+    sort_requirement
 }
 
 fn normalize_sort_requirement_with_ordering_equivalence_properties(
-    sort_requirement: PhysicalSortRequirement,
+    mut sort_requirement: PhysicalSortRequirement,
     eq_properties: &[OrderingEquivalentClass],
 ) -> PhysicalSortRequirement {
-    let normalized_expr = normalize_expr_with_ordering_equivalence_properties(
-        sort_requirement.expr().clone(),
-        sort_requirement.options(),
-        eq_properties,
-    );
-    if sort_requirement.expr().eq(&normalized_expr) {
-        sort_requirement
-    } else {
-        let mut options = sort_requirement.options();
-        if let Some(col) = normalized_expr.as_any().downcast_ref::<Column>() {
-            for eq_class in eq_properties.iter() {
-                let head = eq_class.head();
-                if head.col.eq(col) {
-                    // If there is a requirement, update it with the requirement of its normalized version.
-                    if let Some(options) = &mut options {
-                        *options = head.options;
-                    }
-                    break;
-                }
-            }
-        }
-        PhysicalSortRequirement::new(normalized_expr, options)
+    if let Some(options) = &mut sort_requirement.options {
+        (sort_requirement.expr, *options) =
+            normalize_expr_with_ordering_equivalence_properties(
+                sort_requirement.expr,
+                *options,
+                eq_properties,
+            );
     }
+    sort_requirement
 }
 
 pub fn normalize_sort_expr(
@@ -285,16 +267,16 @@ pub fn normalize_sort_expr(
 
 pub fn normalize_sort_requirement(
     sort_requirement: PhysicalSortRequirement,
-    eq_classes: &[EquivalentClass],
-    ordering_eq_classes: &[OrderingEquivalentClass],
+    eq_properties: &[EquivalentClass],
+    ordering_eq_properties: &[OrderingEquivalentClass],
 ) -> PhysicalSortRequirement {
     let normalized = normalize_sort_requirement_with_equivalence_properties(
         sort_requirement,
-        eq_classes,
+        eq_properties,
     );
     normalize_sort_requirement_with_ordering_equivalence_properties(
         normalized,
-        ordering_eq_classes,
+        ordering_eq_properties,
     )
 }
 
@@ -331,29 +313,28 @@ fn ordering_satisfy_concrete<
     equal_properties: F,
     ordering_equal_properties: F2,
 ) -> bool {
-    if required.len() > provided.len() {
-        false
-    } else if required
+    let oeq_properties = ordering_equal_properties();
+    let ordering_eq_classes = oeq_properties.classes();
+    let eq_properties = equal_properties();
+    let eq_classes = eq_properties.classes();
+    let mut required_normalized = Vec::new();
+    for expr in required {
+        let item = normalize_sort_expr(expr.clone(), eq_classes, ordering_eq_classes);
+        if !required_normalized.contains(&item) {
+            required_normalized.push(item);
+        }
+    }
+    let provided_normalized = provided
         .iter()
-        .zip(provided.iter())
-        .all(|(req, given)| req.eq(given))
-    {
-        true
-    } else {
-        let oeq_properties = ordering_equal_properties();
-        let ordering_eq_classes = oeq_properties.classes();
-        let eq_properties = equal_properties();
-        let eq_classes = eq_properties.classes();
-        required
-            .iter()
-            .map(|e| normalize_sort_expr(e.clone(), eq_classes, ordering_eq_classes))
-            .zip(
-                provided.iter().map(|e| {
-                    normalize_sort_expr(e.clone(), eq_classes, ordering_eq_classes)
-                }),
-            )
-            .all(|(req, given)| req == given)
+        .map(|e| normalize_sort_expr(e.clone(), eq_classes, ordering_eq_classes))
+        .collect::<Vec<_>>();
+    if required_normalized.len() > provided_normalized.len() {
+        return false;
     }
+    required_normalized
+        .into_iter()
+        .zip(provided_normalized)
+        .all(|(req, given)| given == req)
 }
 
 /// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the
@@ -390,31 +371,29 @@ pub fn ordering_satisfy_requirement_concrete<
     equal_properties: F,
     ordering_equal_properties: F2,
 ) -> bool {
-    if required.len() > provided.len() {
-        false
-    } else if required
+    let oeq_properties = ordering_equal_properties();
+    let ordering_eq_classes = oeq_properties.classes();
+    let eq_properties = equal_properties();
+    let eq_classes = eq_properties.classes();
+    let mut required_normalized = Vec::new();
+    for req in required {
+        let item =
+            normalize_sort_requirement(req.clone(), eq_classes, ordering_eq_classes);
+        if !required_normalized.contains(&item) {
+            required_normalized.push(item);
+        }
+    }
+    let provided_normalized = provided
         .iter()
-        .zip(provided.iter())
-        .all(|(req, given)| given.satisfy(req))
-    {
-        true
-    } else {
-        let oeq_properties = ordering_equal_properties();
-        let ordering_eq_classes = oeq_properties.classes();
-        let eq_properties = equal_properties();
-        let eq_classes = eq_properties.classes();
-        required
-            .iter()
-            .map(|e| {
-                normalize_sort_requirement(e.clone(), eq_classes, ordering_eq_classes)
-            })
-            .zip(
-                provided.iter().map(|e| {
-                    normalize_sort_expr(e.clone(), eq_classes, ordering_eq_classes)
-                }),
-            )
-            .all(|(req, given)| given.satisfy(&req))
+        .map(|e| normalize_sort_expr(e.clone(), eq_classes, ordering_eq_classes))
+        .collect::<Vec<_>>();
+    if required_normalized.len() > provided_normalized.len() {
+        return false;
     }
+    required_normalized
+        .into_iter()
+        .zip(provided_normalized)
+        .all(|(req, given)| given.satisfy(&req))
 }
 
 /// Checks whether the given [`PhysicalSortRequirement`]s are equal or more
@@ -451,29 +430,29 @@ fn requirements_compatible_concrete<
     ordering_equal_properties: F,
     equal_properties: F2,
 ) -> bool {
-    if required.len() > provided.len() {
-        false
-    } else if required
+    let oeq_properties = ordering_equal_properties();
+    let ordering_eq_classes = oeq_properties.classes();
+    let eq_properties = equal_properties();
+    let eq_classes = eq_properties.classes();
+    let mut required_normalized = Vec::new();
+    for req in required {
+        let item =
+            normalize_sort_requirement(req.clone(), eq_classes, ordering_eq_classes);
+        if !required_normalized.contains(&item) {
+            required_normalized.push(item);
+        }
+    }
+    let provided_normalized = provided
         .iter()
-        .zip(provided.iter())
-        .all(|(req, given)| given.compatible(req))
-    {
-        true
-    } else {
-        let oeq_properties = ordering_equal_properties();
-        let ordering_eq_classes = oeq_properties.classes();
-        let eq_properties = equal_properties();
-        let eq_classes = eq_properties.classes();
-        required
-            .iter()
-            .map(|e| {
-                normalize_sort_requirement(e.clone(), eq_classes, ordering_eq_classes)
-            })
-            .zip(provided.iter().map(|e| {
-                normalize_sort_requirement(e.clone(), eq_classes, ordering_eq_classes)
-            }))
-            .all(|(req, given)| given.compatible(&req))
+        .map(|e| normalize_sort_requirement(e.clone(), eq_classes, ordering_eq_classes))
+        .collect::<Vec<_>>();
+    if required_normalized.len() > provided_normalized.len() {
+        return false;
     }
+    required_normalized
+        .into_iter()
+        .zip(provided_normalized)
+        .all(|(req, given)| given.compatible(&req))
 }
 
 /// This function maps back requirement after ProjectionExec
@@ -1253,20 +1232,20 @@ mod tests {
         // Test cases for ordering equivalence normalization
         // First entry in the tuple is PhysicalExpr, second entry is its ordering, third entry is result after normalization.
         let expressions = vec![
-            (&col_d_expr, option1, &col_a_expr),
-            (&col_e_expr, option2, &col_a_expr),
+            (&col_d_expr, option1, option1, &col_a_expr),
+            (&col_e_expr, option2, option1, &col_a_expr),
             // Cannot normalize, hence should return itself.
-            (&col_e_expr, option1, &col_e_expr),
+            (&col_e_expr, option1, option1, &col_e_expr),
         ];
-        for (expr, sort_options, expected_ordering_eq) in expressions {
+        for (expr, sort_options, expected_options, expected_ordering_eq) in expressions {
+            let (normalized_expr, options) =
+                normalize_expr_with_ordering_equivalence_properties(
+                    expr.clone(),
+                    sort_options,
+                    ordering_eq_properties.classes(),
+                );
             assert!(
-                expected_ordering_eq.eq(
-                    &normalize_expr_with_ordering_equivalence_properties(
-                        expr.clone(),
-                        Some(sort_options),
-                        ordering_eq_properties.classes()
-                    )
-                ),
+                normalized_expr.eq(expected_ordering_eq) && (expected_options == options),
                 "error in test: expr: {expr:?}, sort_options: {sort_options:?}"
             );
         }
@@ -1427,4 +1406,89 @@ mod tests {
         }
         Ok(())
     }
+
+    #[test]
+    fn test_ordering_satisfy_different_lengths() -> Result<()> {
+        let col_a = &Column::new("a", 0);
+        let col_b = &Column::new("b", 1);
+        let col_c = &Column::new("c", 2);
+        let col_d = &Column::new("d", 3);
+        let col_e = &Column::new("e", 4);
+        let test_schema = create_test_schema()?;
+        let option1 = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+        // Column a and c are aliases.
+        let mut eq_properties = EquivalenceProperties::new(test_schema.clone());
+        eq_properties.add_equal_conditions((col_a, col_c));
+
+        // Column a and e are ordering equivalent (e.g global ordering of the table can be described both as a ASC and e ASC.)
+        let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema);
+        ordering_eq_properties.add_equal_conditions((
+            &OrderedColumn::new(col_a.clone(), option1),
+            &OrderedColumn::new(col_e.clone(), option1),
+        ));
+        let sort_req_a = PhysicalSortExpr {
+            expr: Arc::new((col_a).clone()) as _,
+            options: option1,
+        };
+        let sort_req_b = PhysicalSortExpr {
+            expr: Arc::new((col_b).clone()) as _,
+            options: option1,
+        };
+        let sort_req_c = PhysicalSortExpr {
+            expr: Arc::new((col_c).clone()) as _,
+            options: option1,
+        };
+        let sort_req_d = PhysicalSortExpr {
+            expr: Arc::new((col_d).clone()) as _,
+            options: option1,
+        };
+        let sort_req_e = PhysicalSortExpr {
+            expr: Arc::new((col_e).clone()) as _,
+            options: option1,
+        };
+
+        assert!(ordering_satisfy_concrete(
+            // After normalization would be a ASC, b ASC, d ASC
+            &[sort_req_a.clone(), sort_req_b.clone(), sort_req_d.clone()],
+            // After normalization would be a ASC, b ASC, d ASC
+            &[
+                sort_req_c.clone(),
+                sort_req_b.clone(),
+                sort_req_a.clone(),
+                sort_req_d.clone(),
+                sort_req_e.clone(),
+            ],
+            || eq_properties.clone(),
+            || ordering_eq_properties.clone(),
+        ));
+
+        assert!(!ordering_satisfy_concrete(
+            // After normalization would be a ASC, b ASC
+            &[sort_req_a.clone(), sort_req_b.clone()],
+            // After normalization would be a ASC, b ASC, d ASC
+            &[
+                sort_req_c.clone(),
+                sort_req_b.clone(),
+                sort_req_a.clone(),
+                sort_req_d.clone(),
+                sort_req_e.clone(),
+            ],
+            || eq_properties.clone(),
+            || ordering_eq_properties.clone(),
+        ));
+
+        assert!(!ordering_satisfy_concrete(
+            // After normalization would be a ASC, b ASC, d ASC
+            &[sort_req_a.clone(), sort_req_b.clone(), sort_req_d.clone()],
+            // After normalization would be a ASC, d ASC, b ASC
+            &[sort_req_c, sort_req_d, sort_req_a, sort_req_b, sort_req_e,],
+            || eq_properties.clone(),
+            || ordering_eq_properties.clone(),
+        ));
+
+        Ok(())
+    }
 }