You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/04/29 16:56:57 UTC

[GitHub] [arrow-datafusion] mustafasrepo opened a new pull request, #6160: Add Support for Ordering Equivalence

mustafasrepo opened a new pull request, #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes [#6159](https://github.com/apache/arrow-datafusion/issues/6159).
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   Datafusion can detect aliased columns (exactly same columns). This enables us to do additional optimizations during planning. However, same thing cannot be done in terms of ordering. 
   Consider query below
   ```sql
   (SELECT c9,
        ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
        FROM aggregate_test_100
        ORDER BY c9 ASC)
   ```
   Output of this query would satisfy `c9 ASC`. However, it will also satisfy `rn1 ASC`. However, currently we cannot detect this.
   Consider query below
   ```sql
   SELECT c9, rn1 FROM (SELECT c9,
          ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
          FROM aggregate_test_100
          ORDER BY c9 ASC)
      ORDER BY rn1
   ```
   It produces the following plan
   ```
   "SortExec: expr=[rn1@1 ASC NULLS LAST]",
   "  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]",
   "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }], mode=[Sorted]",
   "      SortExec: expr=[c9@0 ASC NULLS LAST]",
   "        CsvExec: files={1 group: [[SOURCE_PATH]]}, has_header=true, limit=None, projection=[c9]",
   ```
   We could have produced the following plan, if we were to detect that `c9 ASC` and `rn1 ASC` defines same ordering.
   ```
   "ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]",
   "  BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }], mode=[Sorted]",
   "    SortExec: expr=[c9@0 ASC NULLS LAST]",
   "      CsvExec: files={1 group: [[SOURCE_PATH]]}, has_header=true, limit=None, projection=[c9]",
   ```
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   This PR adds a new method  called `ordering_equivalence_properties` to the `ExecutionPlan` trait. This method is similar to the `equivalence_properties`. `equivalence_properties` returns columns that are aliases. `ordering_equivalence_properties` returns columns that defines global ordering for the schema (They are equal in the ordering sense). This enables us to optimize away unnecessary `SortExec`s in the final plan.
   
   # Are these changes tested?
   Yes, two new tests are added to the `window.slt` file.
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #6160: Add Support for Ordering Equivalence

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on pull request #6160: Add Support for Ordering Equivalence

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on PR #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160#issuecomment-1529101017

   > Thank you for this PR @mustafasrepo -- I skimmed this PR
   > 
   > and one question I have is why add a new `ordering_equivalence_properties` when DataFusion already has `equivalence_properties` which seems to be a more general concept?
   > 
   > It is confusing to me because the existence of `ordering_equivalence_properties` imples that `equivalence_properties` doesn't apply when ordering.
   > 
   > If the existing `equivalence_properties` doesn't support ordering aliasing, did you consider making it more general?
   
   As far as I know `equivalence_properties` keeps track of the exactly same columns such as in the form below.
   | a    | a_alias |
   | -------- | ------- |
   | 1  | 1    |
   | 2 | 2    |
   | 3  | 3   |
   | 5  | 5   |
   
    However, `ordering_equivalence_properties` keeps track of columns that can describe global ordering of the schema, these columns are not necessarily same. Hence they cannot be described in current `equivalence_properties` without modification. Consider table, 
   | a    | b |
   | -------- | ------- |
   | 1  | 9    |
   | 2 | 8    |
   | 3  | 7   |
   | 5  | 5   |
   
   both `a ASC` and `b DESC` can describe ordering of the current table.  If we were to run query
   ```sql
   SELECT *
   FROM table1
   ORDER BY a
   ```
   we do not have to add `SortExec` to final plan, since table is already ordered by `a ASC`. Similarly, if we were to run query
   ```sql
   SELECT *
   FROM table1
   ORDER BY b DESC
   ```
   we do not have to add `SortExec` to final plan, since table is already ordered by `b DESC`.
   Ordering equivalence keeps track of these equalities and treats `a ASC`, and `b DESC` as same requirement. Currently, as far as I am aware, with existing APIs we cannot describe this functionality.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #6160: Add Support for Ordering Equivalence

Posted by "jackwener (via GitHub)" <gi...@apache.org>.
jackwener commented on PR #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160#issuecomment-1531246507

   cc @mingmwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6160: Add Support for Ordering Equivalence

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160#discussion_r1183199984


##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2204,3 +2204,163 @@ SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
 2.994840293343 NULL
 9.674390599321 NULL
 7.728066219895 NULL
+
+# test_c9_rn_ordering_alias
+# These tests check whether Datafusion is aware of the ordering generated by the ROW_NUMBER() window function.
+# Physical plan shouldn't have a SortExec after the BoundedWindowAggExec since the table after BoundedWindowAggExec is already ordered by rn1 ASC and c9 DESC.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 ASC)
+   ORDER BY rn1
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 ASC NULLS LAST, fetch=5
+    Sort: aggregate_test_100.c9 ASC NULLS LAST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+      SortExec: expr=[c9@0 ASC NULLS LAST]
+        CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 ASC)
+   ORDER BY rn1
+   LIMIT 5
+----
+28774375 1
+63044568 2
+141047417 3
+141680161 4
+145294611 5
+
+# test_c9_rn_ordering_alias_opposite_direction
+# These tests check whether Datafusion is aware of the ordering generated by the ROW_NUMBER() window function.
+# Physical plan shouldn't have a SortExec after the BoundedWindowAggExec since the table after BoundedWindowAggExec is already ordered by rn1 ASC and c9 DESC.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 ASC NULLS LAST, fetch=5
+    Sort: aggregate_test_100.c9 DESC NULLS FIRST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+      SortExec: expr=[c9@0 DESC]
+        CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1
+   LIMIT 5
+----
+4268716378 1
+4229654142 2
+4216440507 3
+4144173353 4
+4076864659 5
+
+# test_c9_rn_ordering_alias_opposite_direction2
+# These tests check whether Datafusion is aware of the ordering generated by the ROW_NUMBER() window function.
+# Physical plan _should_ have a SortExec after BoundedWindowAggExec since the table after BoundedWindowAggExec is ordered by rn1 ASC and c9 DESC, which is conflicting with the requirement rn1 DESC.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1 DESC
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 DESC NULLS FIRST, fetch=5
+    Sort: aggregate_test_100.c9 DESC NULLS FIRST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[rn1@1 DESC]
+    ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+        SortExec: expr=[c9@0 DESC]
+          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+               ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+               FROM aggregate_test_100
+               ORDER BY c9 DESC)
+   ORDER BY rn1 DESC
+   LIMIT 5
+----
+28774375 100
+63044568 99
+141047417 98
+141680161 97
+145294611 96
+
+# test_c9_rn_ordering_alias_opposite_direction3
+# These test check for whether datafusion is aware of the ordering of the column generated by ROW_NUMBER() window function.
+# Physical plan should have a SortExec after BoundedWindowAggExec.
+# Since table after BoundedWindowAggExec is ordered by rn1 ASC, and c9 DESC. And it is conflicting with requirement rn1 ASC, c9 DESC
+# (possibly violates global ordering of the c9 column).

Review Comment:
   This comment is unclear -- the requirements you mention (`rn1 ASC, c9 DESC`) is the same with the post-window ordering you mention. Let's improve the comment to accurately reflect what is going on here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6160: Add Support for Ordering Equivalence

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160#discussion_r1182209925


##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -205,9 +209,56 @@ pub fn project_equivalence_properties(
     for class in ec_classes.iter_mut() {
         let mut columns_to_remove = vec![];
         for column in class.iter() {
-            if column.index() >= schema.fields().len()
-                || schema.fields()[column.index()].name() != column.name()
+            let fields = schema.fields();
+            let idx = column.index();
+            if idx >= fields.len() || fields[idx].name() != column.name() {
+                columns_to_remove.push(column.clone());
+            }
+        }
+        for column in columns_to_remove {
+            class.remove(&column);
+        }
+    }
+    ec_classes.retain(|props| props.len() > 1);
+    output_eq.extend(ec_classes);
+}
+
+/// This function applies the given projection to the given ordering
+/// equivalence properties to compute the resulting (projected) ordering
+/// equivalence properties; e.g.
+/// 1) Adding an alias, which can introduce additional ordering equivalence
+///    properties, as in Projection(a, a as a1, a as a2) extends global ordering
+///    of a to a1 and a2.
+/// 2) Truncate the [`OrderingEquivalentClass`]es that are not in the output schema.
+pub fn project_ordering_equivalence_properties(
+    input_eq: OrderingEquivalenceProperties,
+    columns_map: &HashMap<Column, Vec<Column>>,
+    output_eq: &mut OrderingEquivalenceProperties,
+) {
+    let mut ec_classes = input_eq.classes().to_vec();
+    for (column, columns) in columns_map {
+        for class in ec_classes.iter_mut() {
+            if let Some(OrderedColumn { options, .. }) =

Review Comment:
   I could move some of the common code to a function. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6160: Add Support for Ordering Equivalence

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160#issuecomment-1528999772

   Is it related to functional dependency?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6160: Add Support for Ordering Equivalence

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160#discussion_r1183517837


##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -112,41 +102,61 @@ impl EquivalenceProperties {
     }
 }
 
-/// Equivalent Class is a set of Columns that are known to have the same value in all tuples in a relation
-/// Equivalent Class is generated by equality predicates, typically equijoin conditions and equality conditions in filters.
+/// `OrderingEquivalenceProperties` keeps track of columns that describe the

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6160: Add Support for Ordering Equivalence

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160#discussion_r1181636859


##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -205,9 +209,56 @@ pub fn project_equivalence_properties(
     for class in ec_classes.iter_mut() {
         let mut columns_to_remove = vec![];
         for column in class.iter() {
-            if column.index() >= schema.fields().len()
-                || schema.fields()[column.index()].name() != column.name()
+            let fields = schema.fields();
+            let idx = column.index();
+            if idx >= fields.len() || fields[idx].name() != column.name() {
+                columns_to_remove.push(column.clone());
+            }
+        }
+        for column in columns_to_remove {
+            class.remove(&column);
+        }
+    }
+    ec_classes.retain(|props| props.len() > 1);
+    output_eq.extend(ec_classes);
+}
+
+/// This function applies the given projection to the given ordering

Review Comment:
   When reading this PR, it seems to me that these functions (and those in datafusion/physical-expr/src/utils.rs that take a `EquivalenceProperties` / `OrderingEquivalenceProperties` as the first argument would be easier to find / use if they were methods on `EquivalenceProperties` / `OrderingEquivalenceProperties` 
   
   So like
   
   ```rust
   impl OrderingEquivalenceProperties {
     fn project_ordering_equivalence_properties(self, colums_map: ..., output_eq: ...) { ... }
   }
   ```
   
   I realize given how this code is currently structured as a typedef this would be hard to do 



##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -205,9 +209,56 @@ pub fn project_equivalence_properties(
     for class in ec_classes.iter_mut() {
         let mut columns_to_remove = vec![];
         for column in class.iter() {
-            if column.index() >= schema.fields().len()
-                || schema.fields()[column.index()].name() != column.name()
+            let fields = schema.fields();
+            let idx = column.index();
+            if idx >= fields.len() || fields[idx].name() != column.name() {
+                columns_to_remove.push(column.clone());
+            }
+        }
+        for column in columns_to_remove {
+            class.remove(&column);
+        }
+    }
+    ec_classes.retain(|props| props.len() > 1);
+    output_eq.extend(ec_classes);
+}
+
+/// This function applies the given projection to the given ordering
+/// equivalence properties to compute the resulting (projected) ordering
+/// equivalence properties; e.g.
+/// 1) Adding an alias, which can introduce additional ordering equivalence
+///    properties, as in Projection(a, a as a1, a as a2) extends global ordering
+///    of a to a1 and a2.
+/// 2) Truncate the [`OrderingEquivalentClass`]es that are not in the output schema.
+pub fn project_ordering_equivalence_properties(
+    input_eq: OrderingEquivalenceProperties,
+    columns_map: &HashMap<Column, Vec<Column>>,
+    output_eq: &mut OrderingEquivalenceProperties,
+) {
+    let mut ec_classes = input_eq.classes().to_vec();
+    for (column, columns) in columns_map {
+        for class in ec_classes.iter_mut() {
+            if let Some(OrderedColumn { options, .. }) =

Review Comment:
   this seems almost identical to `project_equivalence_properties` except that the check for equivalence and the insertion of new columns is different. I wonder if there is some way to combine them 🤔 



##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -219,6 +270,26 @@ pub fn project_equivalence_properties(
     output_eq.extend(ec_classes);
 }
 
+/// Finds matching column inside OrderingEquivalentClass.
+fn get_matching_column(

Review Comment:
   This seems like a great candidate for a method
   
   ```rust
   impl OrderingEquivalentClass {
     fn get_matching_column(&self, column: &Column) -> T {
     ...
     }
   }
   ```



##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -112,41 +102,45 @@ impl EquivalenceProperties {
     }
 }
 
-/// Equivalent Class is a set of Columns that are known to have the same value in all tuples in a relation
-/// Equivalent Class is generated by equality predicates, typically equijoin conditions and equality conditions in filters.
+pub type OrderingEquivalenceProperties = EquivalenceProperties<OrderedColumn>;
+
+/// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are known
+/// to have the same value in all tuples in a relation. This object is generated
+/// by equality predicates, typically equijoin conditions and equality conditions
+/// in filters.

Review Comment:
   I think this documentation is not quite correct for `OrderingEquivalenceClass` as in that case the ordering comes from join predicates as well as (potentially window aggregates)
   
   
   
   



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -911,12 +1064,18 @@ mod tests {
         ];
         let finer = Some(&finer[..]);
         let empty_schema = &Arc::new(Schema::empty());

Review Comment:
   I wonder if some more unit tests would be valuable here mostly as a way to document how the functions were supposed to work -- trying to write more focused tests might also help be a forcing function to create fewer new functions / refactor functions to share more functionality



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -196,27 +199,156 @@ pub fn normalize_sort_requirement_with_equivalence_properties(
     }
 }
 
+pub fn normalize_expr_with_ordering_equivalence_properties(

Review Comment:
   I wonder if there is some way to reduce the code replication here -- SortRequirement and SortExpr are very similar, as are `OrderingEquivalentClass` and `EquivalentClass`



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2204,3 +2204,78 @@ SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
 2.994840293343 NULL
 9.674390599321 NULL
 7.728066219895 NULL
+
+# test_c9_rn_ordering_alias
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 ASC)
+   ORDER BY rn1
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 ASC NULLS LAST, fetch=5
+    Sort: aggregate_test_100.c9 ASC NULLS LAST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+      SortExec: expr=[c9@0 ASC NULLS LAST]
+        CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 ASC)
+   ORDER BY rn1
+   LIMIT 5
+----
+28774375 1
+63044568 2
+141047417 3
+141680161 4
+145294611 5
+
+# test_c9_rn_ordering_alias_opposite_direction
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 ASC NULLS LAST, fetch=5
+    Sort: aggregate_test_100.c9 DESC NULLS FIRST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+      SortExec: expr=[c9@0 DESC]
+        CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1
+   LIMIT 5
+----
+4268716378 1
+4229654142 2
+4216440507 3
+4144173353 4
+4076864659 5
+

Review Comment:
   Could you pleas add some negative cases too, for example when the subquery is not ordered the same way as the window function?
   
   ```
   EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
                      ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
                      FROM aggregate_test_100
                      ORDER BY c9 DESC)
      ORDER BY rn1
      LIMIT 5
   ```
   
   It would also be good I think to add something like 
   
   ```sql
   ORDER BY r1, cn9
   ```
   
   To  cover the 'more than one element in the equivalence class' case



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2204,3 +2204,78 @@ SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
 2.994840293343 NULL
 9.674390599321 NULL
 7.728066219895 NULL
+
+# test_c9_rn_ordering_alias

Review Comment:
   Maybe worth a comment here saying  the intention is there is no `SortExec` after the `BoundedWindowAggExec`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6160: Add Support for Ordering Equivalence

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160#discussion_r1182411654


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -911,12 +1064,18 @@ mod tests {
         ];
         let finer = Some(&finer[..]);
         let empty_schema = &Arc::new(Schema::empty());

Review Comment:
   I have added unit tests, for each normalization function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6160: Add Support for Ordering Equivalence

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160#discussion_r1183325118


##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2204,3 +2204,163 @@ SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
 2.994840293343 NULL
 9.674390599321 NULL
 7.728066219895 NULL
+
+# test_c9_rn_ordering_alias
+# These tests check whether Datafusion is aware of the ordering generated by the ROW_NUMBER() window function.
+# Physical plan shouldn't have a SortExec after the BoundedWindowAggExec since the table after BoundedWindowAggExec is already ordered by rn1 ASC and c9 DESC.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 ASC)
+   ORDER BY rn1
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 ASC NULLS LAST, fetch=5
+    Sort: aggregate_test_100.c9 ASC NULLS LAST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+      SortExec: expr=[c9@0 ASC NULLS LAST]
+        CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 ASC)
+   ORDER BY rn1
+   LIMIT 5
+----
+28774375 1
+63044568 2
+141047417 3
+141680161 4
+145294611 5
+
+# test_c9_rn_ordering_alias_opposite_direction
+# These tests check whether Datafusion is aware of the ordering generated by the ROW_NUMBER() window function.
+# Physical plan shouldn't have a SortExec after the BoundedWindowAggExec since the table after BoundedWindowAggExec is already ordered by rn1 ASC and c9 DESC.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 ASC NULLS LAST, fetch=5
+    Sort: aggregate_test_100.c9 DESC NULLS FIRST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+      SortExec: expr=[c9@0 DESC]
+        CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1
+   LIMIT 5
+----
+4268716378 1
+4229654142 2
+4216440507 3
+4144173353 4
+4076864659 5
+
+# test_c9_rn_ordering_alias_opposite_direction2
+# These tests check whether Datafusion is aware of the ordering generated by the ROW_NUMBER() window function.
+# Physical plan _should_ have a SortExec after BoundedWindowAggExec since the table after BoundedWindowAggExec is ordered by rn1 ASC and c9 DESC, which is conflicting with the requirement rn1 DESC.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1 DESC
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 DESC NULLS FIRST, fetch=5
+    Sort: aggregate_test_100.c9 DESC NULLS FIRST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[rn1@1 DESC]
+    ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
+      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+        SortExec: expr=[c9@0 DESC]
+          CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+               ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+               FROM aggregate_test_100
+               ORDER BY c9 DESC)
+   ORDER BY rn1 DESC
+   LIMIT 5
+----
+28774375 100
+63044568 99
+141047417 98
+141680161 97
+145294611 96
+
+# test_c9_rn_ordering_alias_opposite_direction3
+# These test check for whether datafusion is aware of the ordering of the column generated by ROW_NUMBER() window function.
+# Physical plan should have a SortExec after BoundedWindowAggExec.
+# Since table after BoundedWindowAggExec is ordered by rn1 ASC, and c9 DESC. And it is conflicting with requirement rn1 ASC, c9 DESC
+# (possibly violates global ordering of the c9 column).

Review Comment:
   I have updated the comment, and modified the test to make intent more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6160: Add Support for Ordering Equivalence

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160#issuecomment-1529002253

   Thank you for this PR @mustafasrepo  -- I skimmed this PR 
   
   and one question I have is why add a new `ordering_equivalence_properties` when DataFusion already has `equivalence_properties` which seems to be a more general concept? 
   
   It is confusing to me because the existence of `ordering_equivalence_properties`  imples that `equivalence_properties` doesn't apply when ordering. 
   
   If the existing `equivalence_properties` doesn't support ordering aliasing, did you consider making it more general?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org