You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/04/11 07:01:11 UTC

[GitHub] [arrow-datafusion] mustafasrepo opened a new pull request, #5951: Treat Partition by columns as set for window functions

mustafasrepo opened a new pull request, #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes [#5915](https://github.com/apache/arrow-datafusion/issues/5915).
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   Currently when window frame is `PARTITION BY a,b ..` and input is ordered by `b, a`. `WindowAggExec` expects its input to be ordered by `a, b`. Hence we need to add another `SortExec` before `WindowAggExec` to change ordering from `b, a` to `a, b`. However, if we were to use `PARTITION BY b,a ..` existing ordering would work. The algorithm for removing sort should treat partition by columns as set. Algorithm shouldn't depend on the order of the columns. With this functionality we can produce better plans.
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   With the changes in this PR we do WindowSortRemoval analysis more cleverly. Previously, algorithm was depended on the order of partition by expressions seen in the window expression. Now we treat `PARTITION BY` columns as mathematical set. Consider the query below
   ```sql
   SELECT c9,
     SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1,
     SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum2
   FROM aggregate_test_100
   ``` 
   Previously it was generating the plan
   ``` sql
   "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@14 as sum2]",
   "  WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]",
   "    SortExec: expr=[c2@1 ASC NULLS LAST,c1@0 ASC NULLS LAST,c9@8 ASC NULLS LAST]",
   "      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]",
   "        SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]",
   "          CsvExec: files={1 group: [[<path>/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]",
   ``` 
   we now produce plan below
   ``` sql
   "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@14 as sum2]",
   "  WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]",
   "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]",
   "      SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]",
   "        CsvExec: files={1 group: [[<path>/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]",
   ``` 
   
   
   # Are these changes tested?
   New tests are added to check partition by columns are treated as set.
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #5951: Treat Partition by columns as set for window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #5951: Treat Partition by columns as set for window functions

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#discussion_r1164674025


##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2036,6 +2036,184 @@ SELECT
 -1114 -1927628110
 15673 -1899175111
 
+# test_window_agg_partition_by_set
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+    FROM aggregate_test_100
+    ORDER BY c9
+    LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+    ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+          SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT c9,
+   SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+   SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+   FROM aggregate_test_100
+   ORDER BY c9
+   LIMIT 5
+----
+28774375 9144476174 9144476174
+63044568 5125627947 5125627947
+141047417 3650978969 3650978969
+141680161 8526017165 8526017165
+145294611 6802765992 6802765992
+
+# test_window_agg_partition_by_set2
+
+query TT
+EXPLAIN SELECT

Review Comment:
   I think this is a sister test to the one above, making sure that the set logic works for both bounded window cases (this one) and normal window cases (the test above). The exact frame here doesn't matter, but it has to result in the choice of a `WindowAggExec`. If you look at the frame in the sister test, it is a fixed-width frame which results in a `BoundedWindowExec`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5951: Treat Partition by columns as set for window functions

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#issuecomment-1507410502

   Sounds good, will do, thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5951: Treat Partition by columns as set for window functions

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#issuecomment-1507197309

   This is good to go from our perspective, @alamb feel free to merge when you want


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5951: Treat Partition by columns as set for window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#discussion_r1162384232


##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2036,6 +2036,184 @@ SELECT
 -1114 -1927628110
 15673 -1899175111
 
+# test_window_agg_partition_by_set
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+    FROM aggregate_test_100
+    ORDER BY c9
+    LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+    ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+          SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT c9,
+   SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+   SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+   FROM aggregate_test_100
+   ORDER BY c9
+   LIMIT 5
+----
+28774375 9144476174 9144476174
+63044568 5125627947 5125627947
+141047417 3650978969 3650978969
+141680161 8526017165 8526017165
+145294611 6802765992 6802765992
+
+# test_window_agg_partition_by_set2
+
+query TT
+EXPLAIN SELECT

Review Comment:
   This test checks for whether `WindowAggExec` treats partition by expressions as set. Physical plan shouldn't have `SortExec` in between `WindowAggExec`s



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5951: Treat Partition by columns as set for window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#discussion_r1165133896


##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -73,6 +75,8 @@ pub struct BoundedWindowAggExec {
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    /// Partition by indices that define ordering

Review Comment:
   I added documentation on `get_ordered_partition_by_indices`. Updated comment here to refer documentation for more info. Thanks for suggestion.



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -73,6 +75,8 @@ pub struct BoundedWindowAggExec {
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    /// Partition by indices that define ordering

Review Comment:
   I added documentation on `get_ordered_partition_by_indices`. Updated comment here to refer documentation for more info. Thanks for the suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5951: Treat Partition by columns as set for window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#discussion_r1165088897


##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2036,6 +2036,184 @@ SELECT
 -1114 -1927628110
 15673 -1899175111
 
+# test_window_agg_partition_by_set
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+    FROM aggregate_test_100
+    ORDER BY c9
+    LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+    ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+          SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT c9,
+   SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+   SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+   FROM aggregate_test_100
+   ORDER BY c9
+   LIMIT 5
+----
+28774375 9144476174 9144476174
+63044568 5125627947 5125627947
+141047417 3650978969 3650978969
+141680161 8526017165 8526017165
+145294611 6802765992 6802765992
+
+# test_window_agg_partition_by_set2
+
+query TT
+EXPLAIN SELECT

Review Comment:
   Exactly, this is the case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5951: Treat Partition by columns as set for window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#issuecomment-1507400729

   Thanks @mustafasrepo and @ozankabak  - please feel to merge PRs as well once they are approved and comments are addressed. There is no need to wait for me (I am trying to avoid being as much of a bottleneck)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5951: Treat Partition by columns as set for window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#discussion_r1162386051


##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2036,6 +2036,184 @@ SELECT
 -1114 -1927628110
 15673 -1899175111
 
+# test_window_agg_partition_by_set
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+    FROM aggregate_test_100
+    ORDER BY c9
+    LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+    ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+          SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT c9,
+   SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+   SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+   FROM aggregate_test_100
+   ORDER BY c9
+   LIMIT 5
+----
+28774375 9144476174 9144476174
+63044568 5125627947 5125627947
+141047417 3650978969 3650978969
+141680161 8526017165 8526017165
+145294611 6802765992 6802765992
+
+# test_window_agg_partition_by_set2
+
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum2
+    FROM aggregate_test_100
+    ORDER BY c9
+    LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING AS sum2
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
+          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+    ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@14 as sum2]
+      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+        WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+          SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT c9,
+   SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1,
+   SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum2
+   FROM aggregate_test_100
+   ORDER BY c9
+   LIMIT 5
+----
+28774375 12665844451 12665844451
+63044568 5125627947 5125627947
+141047417 3650978969 3650978969
+141680161 11924524414 11924524414
+145294611 6802765992 6802765992
+
+
+# test_window_agg_child_equivalence
+
+query TT
+EXPLAIN SELECT c9,
+  SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+  SUM(c9) OVER(PARTITION BY c2, c1_alias ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+  FROM (SELECT c1, c2, c9, c1 as c1_alias
+        FROM aggregate_test_100
+        ORDER BY c9) t1
+  LIMIT 5
+----
+logical_plan
+Projection: t1.c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+  Limit: skip=0, fetch=5
+    WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+      WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        SubqueryAlias: t1
+          Sort: aggregate_test_100.c9 ASC NULLS LAST
+            Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c9, aggregate_test_100.c1 AS c1_alias
+              TableScan: aggregate_test_100 projection=[c1, c2, c9]
+physical_plan
+ProjectionExec: expr=[c9@2 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@4 as sum1, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as sum2]
+  GlobalLimitExec: skip=0, fetch=5
+    BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+      BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+          ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c9@2 as c9, c1@0 as c1_alias]
+            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c9]
+
+
+query III
+SELECT c9,

Review Comment:
   This test checks whether equivalent expressions are considered during Sort removal analysis. Since `c1` and `c1_alias` same column we shouldn't add  unnecessary `SortExec` in between `WindowAggExec`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5951: Treat Partition by columns as set for window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#discussion_r1162383338


##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2036,6 +2036,184 @@ SELECT
 -1114 -1927628110
 15673 -1899175111
 
+# test_window_agg_partition_by_set
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT

Review Comment:
   This test checks for whether `BoundedWindowAggExec` treats partition by expressions as set. Physical plan shouldn't have `SortExec` in between `BoundedWindowAggExec`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #5951: Treat Partition by columns as set for window functions

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#discussion_r1164674025


##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2036,6 +2036,184 @@ SELECT
 -1114 -1927628110
 15673 -1899175111
 
+# test_window_agg_partition_by_set
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+    FROM aggregate_test_100
+    ORDER BY c9
+    LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+    ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+          SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT c9,
+   SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+   SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+   FROM aggregate_test_100
+   ORDER BY c9
+   LIMIT 5
+----
+28774375 9144476174 9144476174
+63044568 5125627947 5125627947
+141047417 3650978969 3650978969
+141680161 8526017165 8526017165
+145294611 6802765992 6802765992
+
+# test_window_agg_partition_by_set2
+
+query TT
+EXPLAIN SELECT

Review Comment:
   I think this is a sister test to the one above, making sure that the set logic works for both normal window cases (this one) and bounded window cases (the test above). The exact frame here doesn't matter, but it has to result in the choice of a `WindowAggExec`. If you look at the frame in the sister test, it is a fixed-width frame which results in a `BoundedWindowExec`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5951: Treat Partition by columns as set for window functions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#discussion_r1164633563


##########
datafusion/core/src/physical_optimizer/utils.rs:
##########
@@ -113,3 +175,49 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
 pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
     plan.as_any().is::<RepartitionExec>()
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[tokio::test]
+    async fn test_find_indices() -> Result<()> {
+        assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
+        assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
+        assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
+        assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
+        assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_merge_and_order_indices() {
+        assert_eq!(
+            merge_and_order_indices([0, 3, 4], [1, 3, 5]),
+            vec![0, 1, 3, 4, 5]
+        );
+        // Result should be ordered, even if inputs are not
+        assert_eq!(
+            merge_and_order_indices([3, 0, 4], [5, 1, 3]),
+            vec![0, 1, 3, 4, 5]
+        );
+    }
+
+    #[tokio::test]
+    async fn test_is_sorted() {
+        assert!(is_sorted([0, 3, 4]));
+        assert!(is_sorted([0, 1, 2]));
+        assert!(is_sorted([0, 1, 4]));
+        assert!(is_sorted([0usize; 0]));
+        assert!(is_sorted([1, 2]));
+        assert!(!is_sorted([3, 2]));
+    }
+
+    #[tokio::test]
+    async fn test_set_difference() {
+        assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
+        assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);

Review Comment:
   it might be good to make a test for sets that are not ordered 
   
   like
   
   ```suggestion
           assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
           assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
           assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
   ```



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2036,6 +2036,184 @@ SELECT
 -1114 -1927628110
 15673 -1899175111
 
+# test_window_agg_partition_by_set
+statement ok

Review Comment:
   ```suggestion
   # These tests checks for whether BoundedWindowAggExec treats partition by expressions as set. 
   # Physical plan shouldn't have SortExec in between BoundedWindowAggExec
   
   
   statement ok
   ```



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -428,10 +434,19 @@ impl SortedPartitionByBoundedWindowStream {
         input: SendableRecordBatchStream,
         baseline_metrics: BaselineMetrics,
         partition_by_sort_keys: Vec<PhysicalSortExpr>,
-    ) -> Self {
+        ordered_partition_by_indices: Vec<usize>,
+    ) -> Result<Self> {
         let state = window_expr.iter().map(|_| IndexMap::new()).collect();
         let empty_batch = RecordBatch::new_empty(schema.clone());
-        Self {
+
+        // In BoundedWindowAggExec all partition by columns should be ordered.
+        if window_expr[0].partition_by().len() != ordered_partition_by_indices.len() {
+            return Err(DataFusionError::Execution(

Review Comment:
   ```suggestion
               return Err(DataFusionError::Internal(
   ```



##########
datafusion/core/src/physical_optimizer/utils.rs:
##########
@@ -113,3 +175,49 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
 pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
     plan.as_any().is::<RepartitionExec>()
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[tokio::test]
+    async fn test_find_indices() -> Result<()> {
+        assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
+        assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
+        assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
+        assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
+        assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_merge_and_order_indices() {
+        assert_eq!(
+            merge_and_order_indices([0, 3, 4], [1, 3, 5]),
+            vec![0, 1, 3, 4, 5]
+        );
+        // Result should be ordered, even if inputs are not
+        assert_eq!(
+            merge_and_order_indices([3, 0, 4], [5, 1, 3]),
+            vec![0, 1, 3, 4, 5]
+        );
+    }
+
+    #[tokio::test]
+    async fn test_is_sorted() {
+        assert!(is_sorted([0, 3, 4]));

Review Comment:
   maybe we should also add a test for `[]` and `[1]` (aka zero length and 1 length slices)



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -73,6 +75,8 @@ pub struct BoundedWindowAggExec {
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    /// Partition by indices that define ordering

Review Comment:
   Maybe putting this documentation on 
   
   ```
   pub(crate) fn get_ordered_partition_by_indices(
   ``` 
   
   and saying "see `get_ordered_partition_by_indices` for more details"  would be a good structure



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -73,6 +75,8 @@ pub struct BoundedWindowAggExec {
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    /// Partition by indices that define ordering

Review Comment:
   I am still a little confused about what a partition by index that defines an ordering and one that does not define an ordering means. Maybe we could change the text to be more explicit 
   
   For example, is this a subset of the partition by expressions that are also part of hte ORDER by clause
   
   Maybe we could offer an example of what indexes this set would be for a query like
   
   ```
   OVER (PARTITION BY a, b ORDER BY y, z)
   ```



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -428,10 +434,19 @@ impl SortedPartitionByBoundedWindowStream {
         input: SendableRecordBatchStream,
         baseline_metrics: BaselineMetrics,
         partition_by_sort_keys: Vec<PhysicalSortExpr>,
-    ) -> Self {
+        ordered_partition_by_indices: Vec<usize>,
+    ) -> Result<Self> {
         let state = window_expr.iter().map(|_| IndexMap::new()).collect();
         let empty_batch = RecordBatch::new_empty(schema.clone());
-        Self {
+
+        // In BoundedWindowAggExec all partition by columns should be ordered.
+        if window_expr[0].partition_by().len() != ordered_partition_by_indices.len() {
+            return Err(DataFusionError::Execution(

Review Comment:
   This should only happen if there is a bug, right? The error won't happen on poorly formed sql, is that right?



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2036,6 +2036,184 @@ SELECT
 -1114 -1927628110
 15673 -1899175111
 
+# test_window_agg_partition_by_set
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+    FROM aggregate_test_100
+    ORDER BY c9
+    LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+    ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+          SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT c9,
+   SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+   SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+   FROM aggregate_test_100
+   ORDER BY c9
+   LIMIT 5
+----
+28774375 9144476174 9144476174
+63044568 5125627947 5125627947
+141047417 3650978969 3650978969
+141680161 8526017165 8526017165
+145294611 6802765992 6802765992
+
+# test_window_agg_partition_by_set2
+
+query TT
+EXPLAIN SELECT

Review Comment:
   Given the code doesn't seem to depend on the actual window function or bounds (PRECEDING AND UNBOUNDED FOLLOWING etc) I wonder if these additional cases add much additional coverage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5951: Treat Partition by columns as set for window functions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#discussion_r1165169989


##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2036,6 +2036,184 @@ SELECT
 -1114 -1927628110
 15673 -1899175111
 
+# test_window_agg_partition_by_set
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+    FROM aggregate_test_100
+    ORDER BY c9
+    LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+    ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+          SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT c9,
+   SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+   SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+   FROM aggregate_test_100
+   ORDER BY c9
+   LIMIT 5
+----
+28774375 9144476174 9144476174
+63044568 5125627947 5125627947
+141047417 3650978969 3650978969
+141680161 8526017165 8526017165
+145294611 6802765992 6802765992
+
+# test_window_agg_partition_by_set2
+
+query TT
+EXPLAIN SELECT

Review Comment:
   > Given the code doesn't seem to depend on the actual window function or bounds (PRECEDING AND UNBOUNDED FOLLOWING etc) I wonder if these additional cases add much additional coverage?
   
   The motivations was to ensure new functionality works for that both `WindowAggExec` and `BoundedWindowExec`. However, two tests seem really redundant. I have written first test such that, it checks the functionality works for both executors. Then deleted the second test. I did similar change to `test_window_agg_child_equivalence` test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org