You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/06/05 07:43:44 UTC
[arrow-datafusion] branch main updated: Support ordering analysis with expressions (not just columns) by Replace `OrderedColumn` with `PhysicalSortExpr` (#6501)
This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0c88eecf9d Support ordering analysis with expressions (not just columns) by Replace `OrderedColumn` with `PhysicalSortExpr` (#6501)
0c88eecf9d is described below
commit 0c88eecf9d5ba09819b147ec1a0b074bd7ff75bd
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Mon Jun 5 10:43:37 2023 +0300
Support ordering analysis with expressions (not just columns) by Replace `OrderedColumn` with `PhysicalSortExpr` (#6501)
* remove hash dependency from EquivalenceProperties
* Convert OrderedColumn to PhysicalSortExpr
* Convert unit tests to new API.
* Simplifications
* Simplifications
* Add new test
* Update comments, move type definition to common place.
* Update comment
* change function name
---
.../core/src/physical_plan/aggregates/mod.rs | 14 +-
datafusion/core/src/physical_plan/mod.rs | 20 +-
datafusion/core/src/physical_plan/windows/mod.rs | 11 +-
.../core/tests/sqllogictests/test_files/window.slt | 24 +++
datafusion/physical-expr/src/equivalence.rs | 238 +++++++++++++--------
datafusion/physical-expr/src/lib.rs | 2 +-
datafusion/physical-expr/src/utils.rs | 42 +++-
7 files changed, 221 insertions(+), 130 deletions(-)
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 455a86660e..f96a09c8e9 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -1201,8 +1201,8 @@ mod tests {
lit, ApproxDistinct, Column, Count, FirstValue, Median,
};
use datafusion_physical_expr::{
- AggregateExpr, EquivalenceProperties, OrderedColumn,
- OrderingEquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+ AggregateExpr, EquivalenceProperties, OrderingEquivalenceProperties,
+ PhysicalExpr, PhysicalSortExpr,
};
use futures::{FutureExt, Stream};
use std::any::Any;
@@ -1860,8 +1860,14 @@ mod tests {
eq_properties.add_equal_conditions((&col_a, &col_b));
let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema);
ordering_eq_properties.add_equal_conditions((
- &vec![OrderedColumn::new(col_a.clone(), options1)],
- &vec![OrderedColumn::new(col_c.clone(), options2)],
+ &vec![PhysicalSortExpr {
+ expr: Arc::new(col_a.clone()) as _,
+ options: options1,
+ }],
+ &vec![PhysicalSortExpr {
+ expr: Arc::new(col_c.clone()) as _,
+ options: options2,
+ }],
));
let mut order_by_exprs = vec![
None,
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 155d79e7e8..deff619b4f 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -588,26 +588,10 @@ pub fn ordering_equivalence_properties_helper(
// Return an empty OrderingEquivalenceProperties:
return oep;
};
- let first_column = first_ordering
- .iter()
- .map(|e| TryFrom::try_from(e.clone()))
- .collect::<Result<Vec<_>>>();
- let checked_column_first = if let Ok(first) = first_column {
- first
- } else {
- // Return an empty OrderingEquivalenceProperties:
- return oep;
- };
// First entry among eq_orderings is the head, skip it:
for ordering in eq_orderings.iter().skip(1) {
- let column = ordering
- .iter()
- .map(|e| TryFrom::try_from(e.clone()))
- .collect::<Result<Vec<_>>>();
- if let Ok(column) = column {
- if !column.is_empty() {
- oep.add_equal_conditions((&checked_column_first, &column))
- }
+ if !ordering.is_empty() {
+ oep.add_equal_conditions((first_ordering, ordering))
}
}
oep
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index d7eedf7f18..f773f3b549 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -53,9 +53,7 @@ use datafusion_physical_expr::utils::{convert_to_expr, get_indices_of_matching_e
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};
-use datafusion_physical_expr::{
- OrderedColumn, OrderingEquivalenceProperties, PhysicalSortRequirement,
-};
+use datafusion_physical_expr::{OrderingEquivalenceProperties, PhysicalSortRequirement};
pub use window_agg_exec::WindowAggExec;
/// Create a physical expression for window function
@@ -270,14 +268,17 @@ pub(crate) fn window_ordering_equivalence(
.is::<RowNumber>()
{
if let Some((idx, field)) =
- schema.column_with_name(expr.field().unwrap().name())
+ schema.column_with_name(builtin_window_expr.name())
{
let column = Column::new(field.name(), idx);
let options = SortOptions {
descending: false,
nulls_first: false,
}; // ASC, NULLS LAST
- let rhs = OrderedColumn::new(column, options);
+ let rhs = PhysicalSortExpr {
+ expr: Arc::new(column) as _,
+ options,
+ };
builder.add_equal_conditions(vec![rhs]);
}
}
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index f1280fa688..c0b861fd8a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2405,6 +2405,30 @@ GlobalLimitExec: skip=0, fetch=5
------SortExec: expr=[c9@0 DESC]
--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], has_header=true
+# This test shows that ordering equivalence can keep track of complex expressions (not just Column expressions)
+# during ordering satisfy analysis. In the final plan we should only see single SortExec.
+query TT
+EXPLAIN SELECT c5, c9, rn1 FROM (SELECT c5, c9,
+ ROW_NUMBER() OVER(ORDER BY c9 + c5 DESC) as rn1
+ FROM aggregate_test_100
+ ORDER BY c9 + c5 DESC)
+ ORDER BY rn1, c9 + c5 DESC
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: rn1 ASC NULLS LAST, CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST, fetch=5
+----Sort: CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST
+------Projection: aggregate_test_100.c5, aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [CAST(aggregate_test_100.c9 AS Int32) + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c5, c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--ProjectionExec: expr=[c5@0 as c5, c9@1 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rn1]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 + aggregate_test_100.c5 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_b [...]
+------SortExec: expr=[CAST(c9@1 AS Int32) + c5@0 DESC]
+--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5, c9], has_header=true
+
# The following query has type error. We should test the error could be detected
# from either the logical plan (when `skip_failed_rules` is set to `false`) or
# the physical plan (when `skip_failed_rules` is set to `true`).
diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs
index 83f26a1d06..78279851bb 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -15,17 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-use crate::expressions::Column;
+use crate::expressions::{BinaryExpr, Column};
use crate::{
- normalize_expr_with_equivalence_properties, PhysicalSortExpr, PhysicalSortRequirement,
+ normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr,
+ PhysicalSortExpr,
};
use arrow::datatypes::SchemaRef;
-use arrow_schema::SortOptions;
-use datafusion_common::DataFusionError;
-use std::collections::{HashMap, HashSet};
-use std::hash::Hash;
+use std::collections::HashMap;
use std::sync::Arc;
/// Represents a collection of [`EquivalentClass`] (equivalences
@@ -34,14 +32,14 @@ use std::sync::Arc;
/// This is used to represent both:
///
/// 1. Equality conditions (like `A=B`), when `T` = [`Column`]
-/// 2. Ordering (like `A ASC = B ASC`), when `T` = [`OrderedColumn`]
+/// 2. Ordering (like `A ASC = B ASC`), when `T` = [`PhysicalSortExpr`]
#[derive(Debug, Clone)]
pub struct EquivalenceProperties<T = Column> {
classes: Vec<EquivalentClass<T>>,
schema: SchemaRef,
}
-impl<T: Eq + Hash + Clone> EquivalenceProperties<T> {
+impl<T: PartialEq + Clone> EquivalenceProperties<T> {
pub fn new(schema: SchemaRef) -> Self {
EquivalenceProperties {
classes: vec![],
@@ -115,6 +113,53 @@ impl<T: Eq + Hash + Clone> EquivalenceProperties<T> {
}
}
+/// Remove duplicates inside the `in_data` vector, returned vector would consist of unique entries
+fn deduplicate_vector<T: PartialEq>(in_data: Vec<T>) -> Vec<T> {
+ let mut result = vec![];
+ for elem in in_data {
+ if !result.contains(&elem) {
+ result.push(elem);
+ }
+ }
+ result
+}
+
+/// Find the position of `entry` inside `in_data`, if `entry` is not found return `None`.
+fn get_entry_position<T: PartialEq>(in_data: &[T], entry: &T) -> Option<usize> {
+ in_data.iter().position(|item| item.eq(entry))
+}
+
+/// Remove `entry` for the `in_data`, returns `true` if removal is successful (e.g `entry` is indeed in the `in_data`)
+/// Otherwise return `false`
+fn remove_from_vec<T: PartialEq>(in_data: &mut Vec<T>, entry: &T) -> bool {
+ if let Some(idx) = get_entry_position(in_data, entry) {
+ in_data.remove(idx);
+ true
+ } else {
+ false
+ }
+}
+
+// Helper function to calculate column info recursively
+fn get_column_indices_helper(
+ indices: &mut Vec<(usize, String)>,
+ expr: &Arc<dyn PhysicalExpr>,
+) {
+ if let Some(col) = expr.as_any().downcast_ref::<Column>() {
+ indices.push((col.index(), col.name().to_string()))
+ } else if let Some(binary_expr) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ get_column_indices_helper(indices, binary_expr.left());
+ get_column_indices_helper(indices, binary_expr.right());
+ };
+}
+
+/// Get index and name of each column that is in the expression (Can return multiple entries for `BinaryExpr`s)
+fn get_column_indices(expr: &Arc<dyn PhysicalExpr>) -> Vec<(usize, String)> {
+ let mut result = vec![];
+ get_column_indices_helper(&mut result, expr);
+ result
+}
+
/// `OrderingEquivalenceProperties` keeps track of columns that describe the
/// global ordering of the schema. These columns are not necessarily same; e.g.
/// ```text
@@ -130,34 +175,32 @@ impl<T: Eq + Hash + Clone> EquivalenceProperties<T> {
/// where both `a ASC` and `b DESC` can describe the table ordering. With
/// `OrderingEquivalenceProperties`, we can keep track of these equivalences
/// and treat `a ASC` and `b DESC` as the same ordering requirement.
-pub type OrderingEquivalenceProperties = EquivalenceProperties<Vec<OrderedColumn>>;
+pub type OrderingEquivalenceProperties = EquivalenceProperties<LexOrdering>;
-/// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are known
+/// EquivalentClass is a set of [`Column`]s or [`PhysicalSortExpr`]s that are known
/// to have the same value in all tuples in a relation. `EquivalentClass<Column>`
/// is generated by equality predicates, typically equijoin conditions and equality
-/// conditions in filters. `EquivalentClass<OrderedColumn>` is generated by the
+/// conditions in filters. `EquivalentClass<PhysicalSortExpr>` is generated by the
/// `ROW_NUMBER` window function.
#[derive(Debug, Clone)]
pub struct EquivalentClass<T = Column> {
/// First element in the EquivalentClass
head: T,
/// Other equal columns
- others: HashSet<T>,
+ others: Vec<T>,
}
-impl<T: Eq + Hash + Clone> EquivalentClass<T> {
+impl<T: PartialEq + Clone> EquivalentClass<T> {
pub fn new(head: T, others: Vec<T>) -> EquivalentClass<T> {
- EquivalentClass {
- head,
- others: HashSet::from_iter(others),
- }
+ let others = deduplicate_vector(others);
+ EquivalentClass { head, others }
}
pub fn head(&self) -> &T {
&self.head
}
- pub fn others(&self) -> &HashSet<T> {
+ pub fn others(&self) -> &[T] {
&self.others
}
@@ -166,15 +209,21 @@ impl<T: Eq + Hash + Clone> EquivalentClass<T> {
}
pub fn insert(&mut self, col: T) -> bool {
- self.head != col && self.others.insert(col)
+ if self.head != col && !self.others.contains(&col) {
+ self.others.push(col);
+ true
+ } else {
+ false
+ }
}
pub fn remove(&mut self, col: &T) -> bool {
- let removed = self.others.remove(col);
+ let removed = remove_from_vec(&mut self.others, col);
+ // If we are removing the head, shift others so that its first entry becomes the new head.
if !removed && *col == self.head {
- let one_col = self.others.iter().next().cloned();
+ let one_col = self.others.first().cloned();
if let Some(col) = one_col {
- let removed = self.others.remove(&col);
+ let removed = remove_from_vec(&mut self.others, &col);
self.head = col;
removed
} else {
@@ -198,58 +247,7 @@ impl<T: Eq + Hash + Clone> EquivalentClass<T> {
}
}
-/// This object represents a [`Column`] with a definite ordering, for
-/// example `A ASC` and is used to represent equivalent orderings in
-/// the optimizer.
-#[derive(Debug, Hash, PartialEq, Eq, Clone)]
-pub struct OrderedColumn {
- pub col: Column,
- pub options: SortOptions,
-}
-
-impl OrderedColumn {
- pub fn new(col: Column, options: SortOptions) -> Self {
- Self { col, options }
- }
-}
-
-impl From<OrderedColumn> for PhysicalSortExpr {
- fn from(value: OrderedColumn) -> Self {
- PhysicalSortExpr {
- expr: Arc::new(value.col) as _,
- options: value.options,
- }
- }
-}
-
-impl TryFrom<PhysicalSortExpr> for OrderedColumn {
- type Error = DataFusionError;
-
- fn try_from(value: PhysicalSortExpr) -> Result<Self, Self::Error> {
- if let Some(col) = value.expr.as_any().downcast_ref::<Column>() {
- Ok(OrderedColumn {
- col: col.clone(),
- options: value.options,
- })
- } else {
- Err(DataFusionError::NotImplemented(
- "Only Column PhysicalSortExpr's can be downcasted to OrderedColumn yet"
- .to_string(),
- ))
- }
- }
-}
-
-impl From<OrderedColumn> for PhysicalSortRequirement {
- fn from(value: OrderedColumn) -> Self {
- PhysicalSortRequirement {
- expr: Arc::new(value.col) as _,
- options: Some(value.options),
- }
- }
-}
-
-/// `Vec<OrderedColumn>` stores the lexicographical ordering for a schema.
+/// `LexOrdering` stores the lexicographical ordering for a schema.
/// OrderingEquivalentClass keeps track of different alternative orderings than can
/// describe the schema.
/// For instance, for the table below
@@ -260,7 +258,7 @@ impl From<OrderedColumn> for PhysicalSortRequirement {
/// |3|2|1|3|
/// both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the ordering of the table.
/// For this case, we say that `vec![a ASC, b ASC]`, and `vec![c DESC, d ASC]` are ordering equivalent.
-pub type OrderingEquivalentClass = EquivalentClass<Vec<OrderedColumn>>;
+pub type OrderingEquivalentClass = EquivalentClass<LexOrdering>;
impl OrderingEquivalentClass {
/// This function extends ordering equivalences with alias information.
@@ -269,15 +267,17 @@ impl OrderingEquivalentClass {
/// since b is alias of colum a. After this function (a ASC), (c DESC), (b ASC) would be ordering equivalent.
fn update_with_aliases(&mut self, columns_map: &HashMap<Column, Vec<Column>>) {
for (column, columns) in columns_map {
+ let col_expr = Arc::new(column.clone()) as Arc<dyn PhysicalExpr>;
let mut to_insert = vec![];
for ordering in std::iter::once(&self.head).chain(self.others.iter()) {
for (idx, item) in ordering.iter().enumerate() {
- if item.col.eq(column) {
+ if item.expr.eq(&col_expr) {
for col in columns {
+ let col_expr = Arc::new(col.clone()) as Arc<dyn PhysicalExpr>;
let mut normalized = self.head.clone();
// Change the corresponding entry in the head with the alias column:
let entry = &mut normalized[idx];
- (entry.col, entry.options) = (col.clone(), item.options);
+ (entry.expr, entry.options) = (col_expr, item.options);
to_insert.push(normalized);
}
}
@@ -333,7 +333,10 @@ impl OrderingEquivalenceBuilder {
self
}
- pub fn add_equal_conditions(&mut self, new_equivalent_ordering: Vec<OrderedColumn>) {
+ pub fn add_equal_conditions(
+ &mut self,
+ new_equivalent_ordering: Vec<PhysicalSortExpr>,
+ ) {
let mut normalized_out_ordering = vec![];
for item in &self.existing_ordering {
// To account for ordering equivalences, first normalize the expression:
@@ -341,14 +344,10 @@ impl OrderingEquivalenceBuilder {
item.expr.clone(),
self.eq_properties.classes(),
);
- // Currently we only support ordering equivalences for `Column` expressions.
- // TODO: Add support for ordering equivalence for all `PhysicalExpr`s.
- if let Some(column) = normalized.as_any().downcast_ref::<Column>() {
- normalized_out_ordering
- .push(OrderedColumn::new(column.clone(), item.options));
- } else {
- break;
- }
+ normalized_out_ordering.push(PhysicalSortExpr {
+ expr: normalized,
+ options: item.options,
+ });
}
// If there is an existing ordering, add new ordering as an equivalence:
if !normalized_out_ordering.is_empty() {
@@ -433,18 +432,22 @@ pub fn project_ordering_equivalence_properties(
let schema = output_eq.schema();
let fields = schema.fields();
for class in eq_classes.iter_mut() {
- let columns_to_remove = class
+ let sort_exprs_to_remove = class
.iter()
- .filter(|columns| {
- columns.iter().any(|column| {
- let idx = column.col.index();
- idx >= fields.len() || fields[idx].name() != column.col.name()
+ .filter(|sort_exprs| {
+ sort_exprs.iter().any(|sort_expr| {
+ let col_infos = get_column_indices(&sort_expr.expr);
+ // If any one of the columns, used in Expression is invalid, remove expression
+ // from ordering equivalences
+ col_infos.into_iter().any(|(idx, name)| {
+ idx >= fields.len() || fields[idx].name() != &name
+ })
})
})
.cloned()
.collect::<Vec<_>>();
- for column in columns_to_remove {
- class.remove(&column);
+ for sort_exprs in sort_exprs_to_remove {
+ class.remove(&sort_exprs);
}
}
eq_classes.retain(|props| props.len() > 1);
@@ -459,6 +462,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::Result;
+ use datafusion_expr::Operator;
use std::sync::Arc;
#[test]
@@ -551,4 +555,52 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_deduplicate_vector() -> Result<()> {
+ assert_eq!(deduplicate_vector(vec![1, 1, 2, 3, 3]), vec![1, 2, 3]);
+ assert_eq!(
+ deduplicate_vector(vec![1, 2, 3, 4, 3, 2, 1, 0]),
+ vec![1, 2, 3, 4, 0]
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_get_entry_position() -> Result<()> {
+ assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &2), Some(2));
+ assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &1), Some(0));
+ assert_eq!(get_entry_position(&[1, 1, 2, 3, 3], &5), None);
+ Ok(())
+ }
+
+ #[test]
+ fn test_remove_from_vec() -> Result<()> {
+ let mut in_data = vec![1, 1, 2, 3, 3];
+ remove_from_vec(&mut in_data, &5);
+ assert_eq!(in_data, vec![1, 1, 2, 3, 3]);
+ remove_from_vec(&mut in_data, &2);
+ assert_eq!(in_data, vec![1, 1, 3, 3]);
+ remove_from_vec(&mut in_data, &2);
+ assert_eq!(in_data, vec![1, 1, 3, 3]);
+ remove_from_vec(&mut in_data, &3);
+ assert_eq!(in_data, vec![1, 1, 3]);
+ remove_from_vec(&mut in_data, &3);
+ assert_eq!(in_data, vec![1, 1]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_get_column_infos() -> Result<()> {
+ let expr1 = Arc::new(Column::new("col1", 2)) as _;
+ assert_eq!(get_column_indices(&expr1), vec![(2, "col1".to_string())]);
+ let expr2 = Arc::new(Column::new("col2", 5)) as _;
+ assert_eq!(get_column_indices(&expr2), vec![(5, "col2".to_string())]);
+ let expr3 = Arc::new(BinaryExpr::new(expr1, Operator::Plus, expr2)) as _;
+ assert_eq!(
+ get_column_indices(&expr3),
+ vec![(2, "col1".to_string()), (5, "col2".to_string())]
+ );
+ Ok(())
+ }
}
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index 494f35566d..710e9342b1 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -50,7 +50,7 @@ pub use aggregate::AggregateExpr;
pub use datafusion_common::from_slice;
pub use equivalence::{
project_equivalence_properties, project_ordering_equivalence_properties,
- EquivalenceProperties, EquivalentClass, OrderedColumn, OrderingEquivalenceProperties,
+ EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,
OrderingEquivalentClass,
};
pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef};
diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs
index 9c28243bed..f95ec032eb 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -721,7 +721,7 @@ pub fn get_finer_ordering<
mod tests {
use super::*;
use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal};
- use crate::{OrderedColumn, PhysicalSortExpr};
+ use crate::PhysicalSortExpr;
use arrow::compute::SortOptions;
use datafusion_common::{Result, ScalarValue};
use std::fmt::{Display, Formatter};
@@ -809,17 +809,35 @@ mod tests {
let mut ordering_eq_properties =
OrderingEquivalenceProperties::new(test_schema.clone());
ordering_eq_properties.add_equal_conditions((
- &vec![OrderedColumn::new(col_a.clone(), option1)],
+ &vec![PhysicalSortExpr {
+ expr: Arc::new(col_a.clone()),
+ options: option1,
+ }],
&vec![
- OrderedColumn::new(col_d.clone(), option1),
- OrderedColumn::new(col_b.clone(), option1),
+ PhysicalSortExpr {
+ expr: Arc::new(col_d.clone()),
+ options: option1,
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(col_b.clone()),
+ options: option1,
+ },
],
));
ordering_eq_properties.add_equal_conditions((
- &vec![OrderedColumn::new(col_a.clone(), option1)],
+ &vec![PhysicalSortExpr {
+ expr: Arc::new(col_a.clone()),
+ options: option1,
+ }],
&vec![
- OrderedColumn::new(col_e.clone(), option2),
- OrderedColumn::new(col_b.clone(), option1),
+ PhysicalSortExpr {
+ expr: Arc::new(col_e.clone()),
+ options: option2,
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(col_b.clone()),
+ options: option1,
+ },
],
));
Ok((test_schema, eq_properties, ordering_eq_properties))
@@ -1326,8 +1344,14 @@ mod tests {
// Column a and e are ordering equivalent (e.g global ordering of the table can be described both as a ASC and e ASC.)
let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema);
ordering_eq_properties.add_equal_conditions((
- &vec![OrderedColumn::new(col_a.clone(), option1)],
- &vec![OrderedColumn::new(col_e.clone(), option1)],
+ &vec![PhysicalSortExpr {
+ expr: Arc::new(col_a.clone()),
+ options: option1,
+ }],
+ &vec![PhysicalSortExpr {
+ expr: Arc::new(col_e.clone()),
+ options: option1,
+ }],
));
let sort_req_a = PhysicalSortExpr {
expr: Arc::new((col_a).clone()) as _,