You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/06/05 07:43:44 UTC

[arrow-datafusion] branch main updated: Support ordering analysis with expressions (not just columns) by Replace `OrderedColumn` with `PhysicalSortExpr` (#6501)

This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c88eecf9d Support ordering analysis with expressions (not just columns) by Replace `OrderedColumn` with `PhysicalSortExpr` (#6501)
0c88eecf9d is described below

commit 0c88eecf9d5ba09819b147ec1a0b074bd7ff75bd
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Mon Jun 5 10:43:37 2023 +0300

    Support ordering analysis with expressions (not just columns) by Replace `OrderedColumn` with `PhysicalSortExpr` (#6501)
    
    * remove hash dependency from EquivalenceProperties
    
    * Convert OrderedColumn to PhysicalSortExpr
    
    * Convert unit tests to new API.
    
    * Simplifications
    
    * Simplifications
    
    * Add new test
    
    * Update comments, move type definition to common place.
    
    * Update comment
    
    * change function name
---
 .../core/src/physical_plan/aggregates/mod.rs       |  14 +-
 datafusion/core/src/physical_plan/mod.rs           |  20 +-
 datafusion/core/src/physical_plan/windows/mod.rs   |  11 +-
 .../core/tests/sqllogictests/test_files/window.slt |  24 +++
 datafusion/physical-expr/src/equivalence.rs        | 238 +++++++++++++--------
 datafusion/physical-expr/src/lib.rs                |   2 +-
 datafusion/physical-expr/src/utils.rs              |  42 +++-
 7 files changed, 221 insertions(+), 130 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 455a86660e..f96a09c8e9 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -1201,8 +1201,8 @@ mod tests {
         lit, ApproxDistinct, Column, Count, FirstValue, Median,
     };
     use datafusion_physical_expr::{
-        AggregateExpr, EquivalenceProperties, OrderedColumn,
-        OrderingEquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+        AggregateExpr, EquivalenceProperties, OrderingEquivalenceProperties,
+        PhysicalExpr, PhysicalSortExpr,
     };
     use futures::{FutureExt, Stream};
     use std::any::Any;
@@ -1860,8 +1860,14 @@ mod tests {
         eq_properties.add_equal_conditions((&col_a, &col_b));
         let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema);
         ordering_eq_properties.add_equal_conditions((
-            &vec![OrderedColumn::new(col_a.clone(), options1)],
-            &vec![OrderedColumn::new(col_c.clone(), options2)],
+            &vec![PhysicalSortExpr {
+                expr: Arc::new(col_a.clone()) as _,
+                options: options1,
+            }],
+            &vec![PhysicalSortExpr {
+                expr: Arc::new(col_c.clone()) as _,
+                options: options2,
+            }],
         ));
         let mut order_by_exprs = vec![
             None,
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 155d79e7e8..deff619b4f 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -588,26 +588,10 @@ pub fn ordering_equivalence_properties_helper(
         // Return an empty OrderingEquivalenceProperties:
         return oep;
     };
-    let first_column = first_ordering
-        .iter()
-        .map(|e| TryFrom::try_from(e.clone()))
-        .collect::<Result<Vec<_>>>();
-    let checked_column_first = if let Ok(first) = first_column {
-        first
-    } else {
-        // Return an empty OrderingEquivalenceProperties:
-        return oep;
-    };
     // First entry among eq_orderings is the head, skip it:
     for ordering in eq_orderings.iter().skip(1) {
-        let column = ordering
-            .iter()
-            .map(|e| TryFrom::try_from(e.clone()))
-            .collect::<Result<Vec<_>>>();
-        if let Ok(column) = column {
-            if !column.is_empty() {
-                oep.add_equal_conditions((&checked_column_first, &column))
-            }
+        if !ordering.is_empty() {
+            oep.add_equal_conditions((first_ordering, ordering))
         }
     }
     oep
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index d7eedf7f18..f773f3b549 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -53,9 +53,7 @@ use datafusion_physical_expr::utils::{convert_to_expr, get_indices_of_matching_e
 pub use datafusion_physical_expr::window::{
     BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
 };
-use datafusion_physical_expr::{
-    OrderedColumn, OrderingEquivalenceProperties, PhysicalSortRequirement,
-};
+use datafusion_physical_expr::{OrderingEquivalenceProperties, PhysicalSortRequirement};
 pub use window_agg_exec::WindowAggExec;
 
 /// Create a physical expression for window function
@@ -270,14 +268,17 @@ pub(crate) fn window_ordering_equivalence(
                 .is::<RowNumber>()
             {
                 if let Some((idx, field)) =
-                    schema.column_with_name(expr.field().unwrap().name())
+                    schema.column_with_name(builtin_window_expr.name())
                 {
                     let column = Column::new(field.name(), idx);
                     let options = SortOptions {
                         descending: false,
                         nulls_first: false,
                     }; // ASC, NULLS LAST
-                    let rhs = OrderedColumn::new(column, options);
+                    let rhs = PhysicalSortExpr {
+                        expr: Arc::new(column) as _,
+                        options,
+                    };
                     builder.add_equal_conditions(vec![rhs]);
                 }
             }
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index f1280fa688..c0b861fd8a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2405,6 +2405,30 @@ GlobalLimitExec: skip=0, fetch=5
 ------SortExec: expr=[c9@0 DESC]
 --------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
 
+# This test shows that ordering equivalence can keep track of complex expressions (not just Column expressions)
+# during ordering satisfy analysis. In the final plan we should only see single SortExec.
+query TT
+EXPLAIN SELECT c5, c9, rn1 FROM (SELECT c5, c9,
+                       ROW_NUMBER() OVER(ORDER BY c9 + c5 DESC) as rn1
+                       FROM aggregate_test_100
+                       ORDER BY c9 + c5 DESC)
+       ORDER BY rn1, c9 + c5 DESC
+       LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: rn1 ASC NULLS LAST, CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST, fetch=5
+----Sort: CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST
+------Projection: aggregate_test_100.c5, aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c5, c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--ProjectionExec: expr=[c5@0 as c5, c9@1 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rn1]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_b [...]
+------SortExec: expr=[CAST(c9@1 AS Int32) + c5@0 DESC]
+--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5, c9], has_header=true
+
 # The following query has type error. We should test the error could be detected
 # from either the logical plan (when `skip_failed_rules` is set to `false`) or
 # the physical plan (when `skip_failed_rules` is set to `true`).
diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs
index 83f26a1d06..78279851bb 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -15,17 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::expressions::Column;
+use crate::expressions::{BinaryExpr, Column};
 use crate::{
-    normalize_expr_with_equivalence_properties, PhysicalSortExpr, PhysicalSortRequirement,
+    normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr,
+    PhysicalSortExpr,
 };
 
 use arrow::datatypes::SchemaRef;
-use arrow_schema::SortOptions;
-use datafusion_common::DataFusionError;
 
-use std::collections::{HashMap, HashSet};
-use std::hash::Hash;
+use std::collections::HashMap;
 use std::sync::Arc;
 
 /// Represents a collection of [`EquivalentClass`] (equivalences
@@ -34,14 +32,14 @@ use std::sync::Arc;
 /// This is used to represent both:
 ///
 /// 1. Equality conditions (like `A=B`), when `T` = [`Column`]
-/// 2. Ordering (like `A ASC = B ASC`), when `T` = [`OrderedColumn`]
+/// 2. Ordering (like `A ASC = B ASC`), when `T` = [`PhysicalSortExpr`]
 #[derive(Debug, Clone)]
 pub struct EquivalenceProperties<T = Column> {
     classes: Vec<EquivalentClass<T>>,
     schema: SchemaRef,
 }
 
-impl<T: Eq + Hash + Clone> EquivalenceProperties<T> {
+impl<T: PartialEq + Clone> EquivalenceProperties<T> {
     pub fn new(schema: SchemaRef) -> Self {
         EquivalenceProperties {
             classes: vec![],
@@ -115,6 +113,53 @@ impl<T: Eq + Hash + Clone> EquivalenceProperties<T> {
     }
 }
 
+/// Remove duplicates inside the `in_data` vector, returned vector would consist of unique entries
+fn deduplicate_vector<T: PartialEq>(in_data: Vec<T>) -> Vec<T> {
+    let mut result = vec![];
+    for elem in in_data {
+        if !result.contains(&elem) {
+            result.push(elem);
+        }
+    }
+    result
+}
+
+/// Find the position of `entry` inside `in_data`, if `entry` is not found return `None`.
+fn get_entry_position<T: PartialEq>(in_data: &[T], entry: &T) -> Option<usize> {
+    in_data.iter().position(|item| item.eq(entry))
+}
+
+/// Remove `entry` for the `in_data`, returns `true` if removal is successful (e.g `entry` is indeed in the `in_data`)
+/// Otherwise return `false`
+fn remove_from_vec<T: PartialEq>(in_data: &mut Vec<T>, entry: &T) -> bool {
+    if let Some(idx) = get_entry_position(in_data, entry) {
+        in_data.remove(idx);
+        true
+    } else {
+        false
+    }
+}
+
+// Helper function to calculate column info recursively
+fn get_column_indices_helper(
+    indices: &mut Vec<(usize, String)>,
+    expr: &Arc<dyn PhysicalExpr>,
+) {
+    if let Some(col) = expr.as_any().downcast_ref::<Column>() {
+        indices.push((col.index(), col.name().to_string()))
+    } else if let Some(binary_expr) = expr.as_any().downcast_ref::<BinaryExpr>() {
+        get_column_indices_helper(indices, binary_expr.left());
+        get_column_indices_helper(indices, binary_expr.right());
+    };
+}
+
+/// Get index and name of each column that is in the expression (Can return multiple entries for `BinaryExpr`s)
+fn get_column_indices(expr: &Arc<dyn PhysicalExpr>) -> Vec<(usize, String)> {
+    let mut result = vec![];
+    get_column_indices_helper(&mut result, expr);
+    result
+}
+
 /// `OrderingEquivalenceProperties` keeps track of columns that describe the
 /// global ordering of the schema. These columns are not necessarily same; e.g.
 /// ```text
@@ -130,34 +175,32 @@ impl<T: Eq + Hash + Clone> EquivalenceProperties<T> {
 /// where both `a ASC` and `b DESC` can describe the table ordering. With
 /// `OrderingEquivalenceProperties`, we can keep track of these equivalences
 /// and treat `a ASC` and `b DESC` as the same ordering requirement.
-pub type OrderingEquivalenceProperties = EquivalenceProperties<Vec<OrderedColumn>>;
+pub type OrderingEquivalenceProperties = EquivalenceProperties<LexOrdering>;
 
-/// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are known
+/// EquivalentClass is a set of [`Column`]s or [`PhysicalSortExpr`]s that are known
 /// to have the same value in all tuples in a relation. `EquivalentClass<Column>`
 /// is generated by equality predicates, typically equijoin conditions and equality
-/// conditions in filters. `EquivalentClass<OrderedColumn>` is generated by the
+/// conditions in filters. `EquivalentClass<PhysicalSortExpr>` is generated by the
 /// `ROW_NUMBER` window function.
 #[derive(Debug, Clone)]
 pub struct EquivalentClass<T = Column> {
     /// First element in the EquivalentClass
     head: T,
     /// Other equal columns
-    others: HashSet<T>,
+    others: Vec<T>,
 }
 
-impl<T: Eq + Hash + Clone> EquivalentClass<T> {
+impl<T: PartialEq + Clone> EquivalentClass<T> {
     pub fn new(head: T, others: Vec<T>) -> EquivalentClass<T> {
-        EquivalentClass {
-            head,
-            others: HashSet::from_iter(others),
-        }
+        let others = deduplicate_vector(others);
+        EquivalentClass { head, others }
     }
 
     pub fn head(&self) -> &T {
         &self.head
     }
 
-    pub fn others(&self) -> &HashSet<T> {
+    pub fn others(&self) -> &[T] {
         &self.others
     }
 
@@ -166,15 +209,21 @@ impl<T: Eq + Hash + Clone> EquivalentClass<T> {
     }
 
     pub fn insert(&mut self, col: T) -> bool {
-        self.head != col && self.others.insert(col)
+        if self.head != col && !self.others.contains(&col) {
+            self.others.push(col);
+            true
+        } else {
+            false
+        }
     }
 
     pub fn remove(&mut self, col: &T) -> bool {
-        let removed = self.others.remove(col);
+        let removed = remove_from_vec(&mut self.others, col);
+        // If we are removing the head, shift others so that its first entry becomes the new head.
         if !removed && *col == self.head {
-            let one_col = self.others.iter().next().cloned();
+            let one_col = self.others.first().cloned();
             if let Some(col) = one_col {
-                let removed = self.others.remove(&col);
+                let removed = remove_from_vec(&mut self.others, &col);
                 self.head = col;
                 removed
             } else {
@@ -198,58 +247,7 @@ impl<T: Eq + Hash + Clone> EquivalentClass<T> {
     }
 }
 
-/// This object represents a [`Column`] with a definite ordering, for
-/// example `A ASC` and is used to represent equivalent orderings in
-/// the optimizer.
-#[derive(Debug, Hash, PartialEq, Eq, Clone)]
-pub struct OrderedColumn {
-    pub col: Column,
-    pub options: SortOptions,
-}
-
-impl OrderedColumn {
-    pub fn new(col: Column, options: SortOptions) -> Self {
-        Self { col, options }
-    }
-}
-
-impl From<OrderedColumn> for PhysicalSortExpr {
-    fn from(value: OrderedColumn) -> Self {
-        PhysicalSortExpr {
-            expr: Arc::new(value.col) as _,
-            options: value.options,
-        }
-    }
-}
-
-impl TryFrom<PhysicalSortExpr> for OrderedColumn {
-    type Error = DataFusionError;
-
-    fn try_from(value: PhysicalSortExpr) -> Result<Self, Self::Error> {
-        if let Some(col) = value.expr.as_any().downcast_ref::<Column>() {
-            Ok(OrderedColumn {
-                col: col.clone(),
-                options: value.options,
-            })
-        } else {
-            Err(DataFusionError::NotImplemented(
-                "Only Column PhysicalSortExpr's can be downcasted to OrderedColumn yet"
-                    .to_string(),
-            ))
-        }
-    }
-}
-
-impl From<OrderedColumn> for PhysicalSortRequirement {
-    fn from(value: OrderedColumn) -> Self {
-        PhysicalSortRequirement {
-            expr: Arc::new(value.col) as _,
-            options: Some(value.options),
-        }
-    }
-}
-
-/// `Vec<OrderedColumn>` stores the lexicographical ordering for a schema.
+/// `LexOrdering` stores the lexicographical ordering for a schema.
 /// OrderingEquivalentClass keeps track of different alternative orderings than can
 /// describe the schema.
 /// For instance, for the table below
@@ -260,7 +258,7 @@ impl From<OrderedColumn> for PhysicalSortRequirement {
 /// |3|2|1|3|
 /// both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the ordering of the table.
 /// For this case, we say that `vec![a ASC, b ASC]`, and `vec![c DESC, d ASC]` are ordering equivalent.
-pub type OrderingEquivalentClass = EquivalentClass<Vec<OrderedColumn>>;
+pub type OrderingEquivalentClass = EquivalentClass<LexOrdering>;
 
 impl OrderingEquivalentClass {
     /// This function extends ordering equivalences with alias information.
@@ -269,15 +267,17 @@ impl OrderingEquivalentClass {
     /// since b is alias of colum a. After this function (a ASC), (c DESC), (b ASC) would be ordering equivalent.
     fn update_with_aliases(&mut self, columns_map: &HashMap<Column, Vec<Column>>) {
         for (column, columns) in columns_map {
+            let col_expr = Arc::new(column.clone()) as Arc<dyn PhysicalExpr>;
             let mut to_insert = vec![];
             for ordering in std::iter::once(&self.head).chain(self.others.iter()) {
                 for (idx, item) in ordering.iter().enumerate() {
-                    if item.col.eq(column) {
+                    if item.expr.eq(&col_expr) {
                         for col in columns {
+                            let col_expr = Arc::new(col.clone()) as Arc<dyn PhysicalExpr>;
                             let mut normalized = self.head.clone();
                             // Change the corresponding entry in the head with the alias column:
                             let entry = &mut normalized[idx];
-                            (entry.col, entry.options) = (col.clone(), item.options);
+                            (entry.expr, entry.options) = (col_expr, item.options);
                             to_insert.push(normalized);
                         }
                     }
@@ -333,7 +333,10 @@ impl OrderingEquivalenceBuilder {
         self
     }
 
-    pub fn add_equal_conditions(&mut self, new_equivalent_ordering: Vec<OrderedColumn>) {
+    pub fn add_equal_conditions(
+        &mut self,
+        new_equivalent_ordering: Vec<PhysicalSortExpr>,
+    ) {
         let mut normalized_out_ordering = vec![];
         for item in &self.existing_ordering {
             // To account for ordering equivalences, first normalize the expression:
@@ -341,14 +344,10 @@ impl OrderingEquivalenceBuilder {
                 item.expr.clone(),
                 self.eq_properties.classes(),
             );
-            // Currently we only support ordering equivalences for `Column` expressions.
-            // TODO: Add support for ordering equivalence for all `PhysicalExpr`s.
-            if let Some(column) = normalized.as_any().downcast_ref::<Column>() {
-                normalized_out_ordering
-                    .push(OrderedColumn::new(column.clone(), item.options));
-            } else {
-                break;
-            }
+            normalized_out_ordering.push(PhysicalSortExpr {
+                expr: normalized,
+                options: item.options,
+            });
         }
         // If there is an existing ordering, add new ordering as an equivalence:
         if !normalized_out_ordering.is_empty() {
@@ -433,18 +432,22 @@ pub fn project_ordering_equivalence_properties(
     let schema = output_eq.schema();
     let fields = schema.fields();
     for class in eq_classes.iter_mut() {
-        let columns_to_remove = class
+        let sort_exprs_to_remove = class
             .iter()
-            .filter(|columns| {
-                columns.iter().any(|column| {
-                    let idx = column.col.index();
-                    idx >= fields.len() || fields[idx].name() != column.col.name()
+            .filter(|sort_exprs| {
+                sort_exprs.iter().any(|sort_expr| {
+                    let col_infos = get_column_indices(&sort_expr.expr);
+                    // If any one of the columns, used in Expression is invalid, remove expression
+                    // from ordering equivalences
+                    col_infos.into_iter().any(|(idx, name)| {
+                        idx >= fields.len() || fields[idx].name() != &name
+                    })
                 })
             })
             .cloned()
             .collect::<Vec<_>>();
-        for column in columns_to_remove {
-            class.remove(&column);
+        for sort_exprs in sort_exprs_to_remove {
+            class.remove(&sort_exprs);
         }
     }
     eq_classes.retain(|props| props.len() > 1);
@@ -459,6 +462,7 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema};
     use datafusion_common::Result;
 
+    use datafusion_expr::Operator;
     use std::sync::Arc;
 
     #[test]
@@ -551,4 +555,52 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_deduplicate_vector() -> Result<()> {
+        assert_eq!(deduplicate_vector(vec![1, 1, 2, 3, 3]), vec![1, 2, 3]);
+        assert_eq!(
+            deduplicate_vector(vec![1, 2, 3, 4, 3, 2, 1, 0]),
+            vec![1, 2, 3, 4, 0]
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn test_get_entry_position() -> Result<()> {
+        assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &2), Some(2));
+        assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &1), Some(0));
+        assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &5), None);
+        Ok(())
+    }
+
+    #[test]
+    fn test_remove_from_vec() -> Result<()> {
+        let mut in_data = vec![1, 1, 2, 3, 3];
+        remove_from_vec(&mut in_data, &5);
+        assert_eq!(in_data, vec![1, 1, 2, 3, 3]);
+        remove_from_vec(&mut in_data, &2);
+        assert_eq!(in_data, vec![1, 1, 3, 3]);
+        remove_from_vec(&mut in_data, &2);
+        assert_eq!(in_data, vec![1, 1, 3, 3]);
+        remove_from_vec(&mut in_data, &3);
+        assert_eq!(in_data, vec![1, 1, 3]);
+        remove_from_vec(&mut in_data, &3);
+        assert_eq!(in_data, vec![1, 1]);
+        Ok(())
+    }
+
+    #[test]
+    fn test_get_column_infos() -> Result<()> {
+        let expr1 = Arc::new(Column::new("col1", 2)) as _;
+        assert_eq!(get_column_indices(&expr1), vec![(2, "col1".to_string())]);
+        let expr2 = Arc::new(Column::new("col2", 5)) as _;
+        assert_eq!(get_column_indices(&expr2), vec![(5, "col2".to_string())]);
+        let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as _;
+        assert_eq!(
+            get_column_indices(&expr3),
+            vec![(2, "col1".to_string()), (5, "col2".to_string())]
+        );
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index 494f35566d..710e9342b1 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -50,7 +50,7 @@ pub use aggregate::AggregateExpr;
 pub use datafusion_common::from_slice;
 pub use equivalence::{
     project_equivalence_properties, project_ordering_equivalence_properties,
-    EquivalenceProperties, EquivalentClass, OrderedColumn, OrderingEquivalenceProperties,
+    EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,
     OrderingEquivalentClass,
 };
 pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef};
diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs
index 9c28243bed..f95ec032eb 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -721,7 +721,7 @@ pub fn get_finer_ordering<
 mod tests {
     use super::*;
     use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal};
-    use crate::{OrderedColumn, PhysicalSortExpr};
+    use crate::PhysicalSortExpr;
     use arrow::compute::SortOptions;
     use datafusion_common::{Result, ScalarValue};
     use std::fmt::{Display, Formatter};
@@ -809,17 +809,35 @@ mod tests {
         let mut ordering_eq_properties =
             OrderingEquivalenceProperties::new(test_schema.clone());
         ordering_eq_properties.add_equal_conditions((
-            &vec![OrderedColumn::new(col_a.clone(), option1)],
+            &vec![PhysicalSortExpr {
+                expr: Arc::new(col_a.clone()),
+                options: option1,
+            }],
             &vec![
-                OrderedColumn::new(col_d.clone(), option1),
-                OrderedColumn::new(col_b.clone(), option1),
+                PhysicalSortExpr {
+                    expr: Arc::new(col_d.clone()),
+                    options: option1,
+                },
+                PhysicalSortExpr {
+                    expr: Arc::new(col_b.clone()),
+                    options: option1,
+                },
             ],
         ));
         ordering_eq_properties.add_equal_conditions((
-            &vec![OrderedColumn::new(col_a.clone(), option1)],
+            &vec![PhysicalSortExpr {
+                expr: Arc::new(col_a.clone()),
+                options: option1,
+            }],
             &vec![
-                OrderedColumn::new(col_e.clone(), option2),
-                OrderedColumn::new(col_b.clone(), option1),
+                PhysicalSortExpr {
+                    expr: Arc::new(col_e.clone()),
+                    options: option2,
+                },
+                PhysicalSortExpr {
+                    expr: Arc::new(col_b.clone()),
+                    options: option1,
+                },
             ],
         ));
         Ok((test_schema, eq_properties, ordering_eq_properties))
@@ -1326,8 +1344,14 @@ mod tests {
         // Column a and e are ordering equivalent (e.g global ordering of the table can be described both as a ASC and e ASC.)
         let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema);
         ordering_eq_properties.add_equal_conditions((
-            &vec![OrderedColumn::new(col_a.clone(), option1)],
-            &vec![OrderedColumn::new(col_e.clone(), option1)],
+            &vec![PhysicalSortExpr {
+                expr: Arc::new(col_a.clone()),
+                options: option1,
+            }],
+            &vec![PhysicalSortExpr {
+                expr: Arc::new(col_e.clone()),
+                options: option1,
+            }],
         ));
         let sort_req_a = PhysicalSortExpr {
             expr: Arc::new((col_a).clone()) as _,