You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/07/13 15:29:46 UTC

[GitHub] [arrow-datafusion] mustafasrepo opened a new pull request, #6956: Extend Ordering Equivalence Support

mustafasrepo opened a new pull request, #6956:
URL: https://github.com/apache/arrow-datafusion/pull/6956

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
   # Rationale for this change
   Currently ordering equivalence properties cannot be propagated through PhysicalPlan. However, for executors that maintain its input order, we should be able to propagate ordering equivalence properties. This PR implements ordering equivalence properties for executors that maintains ordering. For most of the executors, this is trivial (such as `FilterExec`).
   However, for `SortMergeJoinExec` and `HashJoinExec` updating ordering equivalence involves non-trivial implementation. Reviewers can focus on these executors.
   
   Also during tests for propagating ordering equivalence properties through executors. I found a bug in how Projection handles ordering equivalence and equivalence. This PR fixes it also.
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   
   Yes new tests are added to `window.slt` and `joins.slt` file
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6956: Extend Ordering Equivalence Support

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6956:
URL: https://github.com/apache/arrow-datafusion/pull/6956#discussion_r1264963749


##########
datafusion/core/src/physical_plan/coalesce_partitions.rs:
##########
@@ -107,6 +107,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
         self.input.equivalence_properties()
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {

Review Comment:
   Actually, when existing ordering is invalidated (`maintains_input_order` is false). `OrderingEquivalenceProperties` shouldn't be propagated (unlike `Equivalence`). The reason is that `OrderingEquivalence` contains different alternative versions to describe ordering of the table. Once ordering is lost, all of the alternatives descriptions are invalid also. Hence this implementation should be removed. I removed this implementation
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6956: Extend Ordering Equivalence Support

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6956:
URL: https://github.com/apache/arrow-datafusion/pull/6956#discussion_r1264972151


##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -295,6 +321,65 @@ impl ExecutionPlan for SortMergeJoinExec {
         )
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
+        let mut new_properties = OrderingEquivalenceProperties::new(self.schema());

Review Comment:
   Since `SymmetricHashJoin` doesn't maintain input ordering under any circumstance (since it has alternating build sides), its ordering_equivalence implementation should be empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6956: Extend Ordering Equivalence Support

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6956:
URL: https://github.com/apache/arrow-datafusion/pull/6956#discussion_r1264963749


##########
datafusion/core/src/physical_plan/coalesce_partitions.rs:
##########
@@ -107,6 +107,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
         self.input.equivalence_properties()
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {

Review Comment:
   Actually, when existing ordering is invalidated (`maintains_input_order` is false). `OrderingEquivalenceProperties` shouldn't be propagated (unlike `Equivalence`). The reason is that `OrderingEquivalence` contains different alternative versions to describe ordering of the table. Once ordering is lost, all of the alternatives descriptions are invalid also. Hence this implementation should be removed. I removed this implementation.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6956: Extend Ordering Equivalence Support

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6956:
URL: https://github.com/apache/arrow-datafusion/pull/6956#discussion_r1264903534


##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -3055,6 +3078,28 @@ statement error DataFusion error: Error during planning: Aggregate ORDER BY is n
 EXPLAIN SELECT a, b, LAST_VALUE(c ORDER BY a ASC) OVER (order by a ASC) as last_c
        FROM annotated_data_infinite2
 
+# ordering equivalence information
+# should propagate through FilterExec, LimitExec, etc.

Review Comment:
   Fixed it, :thumbsup:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo merged pull request #6956: Extend Ordering Equivalence Support

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo merged PR #6956:
URL: https://github.com/apache/arrow-datafusion/pull/6956


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6956: Extend Ordering Equivalence Support

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6956:
URL: https://github.com/apache/arrow-datafusion/pull/6956#discussion_r1264092547


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -381,6 +376,34 @@ impl ExecutionPlan for HashJoinExec {
         )
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {

Review Comment:
   I think this calculation should be the same for other `ExecutionPlans` like CrossJoin / NestedLoopsJoin as well as the ordering equivalence is a function of the join predicates and type (INNER/OUTER, etc), not the algorithms?  
   
   I wonder if we should move this to a common location rather than just for `HashJoinExec`



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -3055,6 +3078,28 @@ statement error DataFusion error: Error during planning: Aggregate ORDER BY is n
 EXPLAIN SELECT a, b, LAST_VALUE(c ORDER BY a ASC) OVER (order by a ASC) as last_c
        FROM annotated_data_infinite2
 
+# ordering equivalence information
+# should propagate through FilterExec, LimitExec, etc.

Review Comment:
   This comment implies to me that the test should have a `WHERE` clause as well, but the query does not have one. The plan looks pretty much the same to me as the first plan in this file
   
   I think we should correct the test or the comment so they are consistent



##########
datafusion/core/src/physical_plan/coalesce_partitions.rs:
##########
@@ -107,6 +107,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
         self.input.equivalence_properties()
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {

Review Comment:
   Even though `CoalescePartitionsExec` doesn't preserve the input sort order I agree it preserves the input equivalences 👍 



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2429,6 +2429,29 @@ GlobalLimitExec: skip=0, fetch=5
 ------SortExec: expr=[CAST(c9@1 AS Int32) + c5@0 DESC]
 --------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5, c9], has_header=true
 
+# Ordering equivalence should be preserved during cast expression
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                       CAST(ROW_NUMBER() OVER(ORDER BY c9 DESC) as BIGINT) as rn1
+                       FROM aggregate_test_100
+                       ORDER BY c9 DESC)
+       ORDER BY rn1 ASC
+       LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: rn1 ASC NULLS LAST, fetch=5
+----Sort: aggregate_test_100.c9 DESC NULLS FIRST
+------Projection: aggregate_test_100.c9, CAST(ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS Int64) AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5

Review Comment:
   they key is that the final sort on `rn1` has been optimized away 👍 



##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -295,6 +321,65 @@ impl ExecutionPlan for SortMergeJoinExec {
         )
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
+        let mut new_properties = OrderingEquivalenceProperties::new(self.schema());

Review Comment:
   Does this same logic apply to "SymmetricHashJoin" too?



##########
datafusion/core/src/physical_plan/coalesce_partitions.rs:
##########
@@ -107,6 +107,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
         self.input.equivalence_properties()
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {

Review Comment:
   Even though `CoalescePartitionsExec` doesn't preserve the input sort order I agree it preserves the input equivalences 👍 



##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -134,10 +166,26 @@ impl SortMergeJoinExec {
             .unzip();
 
         let output_ordering = match join_type {
-            JoinType::Inner
-            | JoinType::Left
-            | JoinType::LeftSemi
-            | JoinType::LeftAnti => {
+            JoinType::Inner => {
+                match (left.output_ordering(), right.output_ordering()) {
+                    // If both sides have orderings, ordering of the right hand side

Review Comment:
   👍 



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2429,6 +2429,29 @@ GlobalLimitExec: skip=0, fetch=5
 ------SortExec: expr=[CAST(c9@1 AS Int32) + c5@0 DESC]
 --------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5, c9], has_header=true
 
+# Ordering equivalence should be preserved during cast expression
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                       CAST(ROW_NUMBER() OVER(ORDER BY c9 DESC) as BIGINT) as rn1
+                       FROM aggregate_test_100
+                       ORDER BY c9 DESC)
+       ORDER BY rn1 ASC
+       LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: rn1 ASC NULLS LAST, fetch=5
+----Sort: aggregate_test_100.c9 DESC NULLS FIRST
+------Projection: aggregate_test_100.c9, CAST(ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS Int64) AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5

Review Comment:
   they key is that the final sort on `rn1` has been optimized away 👍 



##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -134,10 +166,26 @@ impl SortMergeJoinExec {
             .unzip();
 
         let output_ordering = match join_type {
-            JoinType::Inner
-            | JoinType::Left
-            | JoinType::LeftSemi
-            | JoinType::LeftAnti => {
+            JoinType::Inner => {
+                match (left.output_ordering(), right.output_ordering()) {
+                    // If both sides have orderings, ordering of the right hand side

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6956: Extend Ordering Equivalence Support

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6956:
URL: https://github.com/apache/arrow-datafusion/pull/6956#discussion_r1265104516


##########
datafusion/core/src/physical_plan/coalesce_partitions.rs:
##########
@@ -107,6 +107,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
         self.input.equivalence_properties()
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {

Review Comment:
   For posterity: There could be two meanings we can assign to ordering equivalences:
   1. Entries in the ordering equivalence set describe different, alternative descriptions of *the current table ordering*. This is @mustafasrepo's interpretation.
   2. Entries in the ordering equivalence set describe different, alternative sorts one can apply to this table and end up with the same ordering. This is @alamb's interpretation.
   
   (2) is more general than (1), and it opens up possibilities for more optimizations. However, it is also more difficult to implement in all cases. One can construct examples where the invariant described by (2) is lost when one has order-destroying operators interspersed in between order-imposing operators in deep-ish plans.
   
   We just had a nice discussion with @mustafasrepo on this, and we added generalizing from (1) to (2) to our roadmap of future improvements. Let's get (1) done in a robust way first, and in the future we will revisit this and figure out how to get to (2), and apply even more optimizations!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6956: Extend Ordering Equivalence Support

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6956:
URL: https://github.com/apache/arrow-datafusion/pull/6956#discussion_r1265888566


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -381,6 +376,34 @@ impl ExecutionPlan for HashJoinExec {
         )
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {

Review Comment:
   Makes sense -- thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6956: Extend Ordering Equivalence Support

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6956:
URL: https://github.com/apache/arrow-datafusion/pull/6956#discussion_r1264970204


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -381,6 +376,34 @@ impl ExecutionPlan for HashJoinExec {
         )
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {

Review Comment:
   Similar to above reasoning, only `HashJoinExec` maintains_input_order under certain conditions. Hence, the only join that needs ordering_equivalence implementation is `HashJoinExec`. However, other joins such as `CrossJoin`, `NestedLoopsJoin ` may maintain ordering under certain conditions if so ordering equivalence can be implemented for them also. I will analyze their implementation, if there are case that ordering can be maintained, I will implement their `maintains_input_order` and `ordering_equivalence_properties` methods in follow-up PRs. During that PR we can move, this implementation to a common location if there is a pattern between `ordering_equivalence_properties` of different joins.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org