You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/05/24 18:52:26 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6431: Lexicographical Ordering Equivalence Support

alamb commented on code in PR #6431:
URL: https://github.com/apache/arrow-datafusion/pull/6431#discussion_r1204627658


##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -198,36 +200,50 @@ impl OrderedColumn {
     }
 }
 
-trait ColumnAccessor {
-    fn column(&self) -> &Column;
-}
-
-impl ColumnAccessor for Column {
-    fn column(&self) -> &Column {
-        self
+impl From<OrderedColumn> for PhysicalSortExpr {
+    fn from(value: OrderedColumn) -> Self {
+        PhysicalSortExpr {
+            expr: Arc::new(value.col) as _,
+            options: value.options,
+        }
     }
 }
 
-impl ColumnAccessor for OrderedColumn {
-    fn column(&self) -> &Column {
-        &self.col
+impl From<OrderedColumn> for PhysicalSortRequirement {
+    fn from(value: OrderedColumn) -> Self {
+        PhysicalSortRequirement {
+            expr: Arc::new(value.col) as _,
+            options: Some(value.options),
+        }
     }
 }
 
-pub type OrderingEquivalentClass = EquivalentClass<OrderedColumn>;
+pub type OrderingEquivalentClass = EquivalentClass<Vec<OrderedColumn>>;

Review Comment:
   I think it would help to add some doc comments here explaining that the `Vec` is the set of columns that is equivalent as well as mentioning what, if any, significance the ordering of the `OrderedColumn` in this type have



##########
datafusion/core/src/physical_plan/windows/mod.rs:
##########
@@ -262,7 +262,22 @@ pub(crate) fn window_ordering_equivalence(
             .iter()
             .cloned(),
     );
-    let out_ordering = input.output_ordering().unwrap_or(&[]);
+    let mut normalized_out_ordering = vec![];
+    for item in input.output_ordering().unwrap_or(&[]) {
+        // To account for ordering equivalences, first normalize the expression:
+        let normalized = normalize_expr_with_equivalence_properties(
+            item.expr.clone(),
+            input.equivalence_properties().classes(),
+        );
+        // Currently we only support, ordering equivalences for `Column` expressions.
+        // TODO: Add support for ordering equivalence for all `PhysicalExpr`s

Review Comment:
   I agree this would be nice to do.
   
   I suggest if you want to head in this direction, we start trying to encapsulate the code for working with equivalence classes, required sort order, and input orderings into a single location that is disconnected from the individual physical operators
   
   Like I wonder if we could end up with an API like
   
   ```rust
   let mut builder = OrderingEquivalenceBuilder::new(input.output_ordering())
     .with_equivalences(input.equivalence_properties());
   
   ...
   
               // Only the built-in `RowNumber` window function introduces a new
               // ordering:
               if builtin_window_expr
                   .get_built_in_func_expr()
                   .as_any()
                   .is::<RowNumber>()
               {
                           let column = Column::new(field.name(), idx);
                           let options = SortOptions {
                               descending: false,
                               nulls_first: false,
                           }; // ASC, NULLS LAST
                           let rhs = OrderedColumn::new(column, options);
   
                  builder.add_equal_condition(rhs)
             }
   ...
   builder.build()
   ...
   
   



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -237,47 +173,105 @@ fn normalize_sort_requirement_with_equivalence_properties(
     sort_requirement
 }
 
-fn normalize_sort_requirement_with_ordering_equivalence_properties(
-    mut sort_requirement: PhysicalSortRequirement,
-    eq_properties: &[OrderingEquivalentClass],
-) -> PhysicalSortRequirement {
-    if let Some(options) = &mut sort_requirement.options {
-        (sort_requirement.expr, *options) =
-            normalize_expr_with_ordering_equivalence_properties(
-                sort_requirement.expr,
-                *options,
-                eq_properties,
-            );
+/// This function searches for the slice `section` inside the slice `given`.
+/// It returns each range where `section` is compatible with the corresponding
+/// slice in `given`.
+fn get_compatible_ranges(
+    given: &[PhysicalSortRequirement],
+    section: &[PhysicalSortRequirement],
+) -> Vec<Range<usize>> {
+    let n_section = section.len();
+    let n_end = if given.len() >= n_section {
+        given.len() - n_section + 1
+    } else {
+        0
+    };
+    (0..n_end)
+        .filter_map(|idx| {
+            let end = idx + n_section;
+            given[idx..end]
+                .iter()
+                .zip(section)
+                .all(|(req, given)| given.compatible(req))
+                .then_some(Range { start: idx, end })
+        })
+        .collect()
+}
+
+/// This function constructs a duplicate-free vector by filtering out duplicate
+/// entries inside the given vector `input`.
+fn collapse_vec<T: PartialEq>(input: Vec<T>) -> Vec<T> {
+    let mut output = vec![];
+    for item in input {
+        if !output.contains(&item) {
+            output.push(item);
+        }
     }
-    sort_requirement
+    output
 }
 
-pub fn normalize_sort_expr(
-    sort_expr: PhysicalSortExpr,
+pub fn normalize_sort_exprs(

Review Comment:
   I wonder if we can add some doc comments here explaining what this code is doing (aka what is it normalizing against?)



##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -198,36 +200,50 @@ impl OrderedColumn {
     }
 }
 
-trait ColumnAccessor {
-    fn column(&self) -> &Column;
-}
-
-impl ColumnAccessor for Column {
-    fn column(&self) -> &Column {
-        self
+impl From<OrderedColumn> for PhysicalSortExpr {
+    fn from(value: OrderedColumn) -> Self {
+        PhysicalSortExpr {
+            expr: Arc::new(value.col) as _,
+            options: value.options,
+        }
     }
 }
 
-impl ColumnAccessor for OrderedColumn {
-    fn column(&self) -> &Column {
-        &self.col
+impl From<OrderedColumn> for PhysicalSortRequirement {
+    fn from(value: OrderedColumn) -> Self {
+        PhysicalSortRequirement {
+            expr: Arc::new(value.col) as _,
+            options: Some(value.options),
+        }
     }
 }
 
-pub type OrderingEquivalentClass = EquivalentClass<OrderedColumn>;
+pub type OrderingEquivalentClass = EquivalentClass<Vec<OrderedColumn>>;
 
 impl OrderingEquivalentClass {
-    /// Finds the matching column inside the `OrderingEquivalentClass`.
-    fn get_matching_column(&self, column: &Column) -> Option<OrderedColumn> {
-        if self.head.col.eq(column) {
-            Some(self.head.clone())
-        } else {
-            for item in &self.others {
-                if item.col.eq(column) {
-                    return Some(item.clone());
+    /// This function extends ordering equivalences with alias information.

Review Comment:
   I recommend making these doc comments (like `///`) for uniformity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org