You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/03/14 08:32:02 UTC

[GitHub] [arrow-datafusion] mustafasrepo opened a new pull request, #5587: Do not break pipeline for window queries with GROUPS

mustafasrepo opened a new pull request, #5587:
URL: https://github.com/apache/arrow-datafusion/pull/5587

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes [#5586](https://github.com/apache/arrow-datafusion/issues/5586).
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   Currently, when window queries involve `GROUPS` window frame type. We cannot run it using `BoundedWindowAggExec`. Instead we use `WindowAggExec`. This breaks pipeline, even if there is no theoretical reason to do so. With this change we solve this problem. See the [#5586](https://github.com/apache/arrow-datafusion/issues/5586) for better explanation.
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   With this PR physical plan of the query
   ```sql
   SELECT
       SUM(c6) OVER(ORDER BY c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as sum1
       FROM aggregate_test_100
   ```
   would turn from
   ```sql
   "ProjectionExec: expr=[SUM(aggregate_test_100.c6) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@13 as sum1]",
   "  WindowAggExec: wdw=[SUM(aggregate_test_100.c6): Ok(Field { name: \"SUM(aggregate_test_100.c6)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
   "    SortExec: expr=[c2@1 ASC NULLS LAST]",
   "      CsvExec: files={1 group: [[file_path]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]",
   ```
   to
   ```sql
   "ProjectionExec: expr=[SUM(aggregate_test_100.c6) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING@13 as sum1]",
   "  BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c6): Ok(Field { name: \"SUM(aggregate_test_100.c6)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Groups, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
   "    SortExec: expr=[c2@1 ASC NULLS LAST]",
   "      CsvExec: files={1 group: [[file_path]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]",
   ```
    This enables us to not break the pipeline when window queries involve `GROUPS` clause (given that there is no other pipeline-breaking executor in the plan). 
   
   # Are these changes tested?
   Yes. Fuzzy tests are extended to cover this use case. Also end to end tests are added.
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #5587: Do not break pipeline for window queries with GROUPS

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #5587:
URL: https://github.com/apache/arrow-datafusion/pull/5587


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5587: Do not break pipeline for window queries with GROUPS

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5587:
URL: https://github.com/apache/arrow-datafusion/pull/5587#discussion_r1139185484


##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -548,9 +548,9 @@ impl SortedPartitionByBoundedWindowStream {
             for (partition_row, WindowState { state: value, .. }) in window_agg_state {
                 let n_prune =
                     min(value.window_frame_range.start, value.last_calculated_index);
-                if let Some(state) = n_prune_each_partition.get_mut(partition_row) {
-                    if n_prune < *state {
-                        *state = n_prune;
+                if let Some(current) = n_prune_each_partition.get_mut(partition_row) {

Review Comment:
   The changes in this file appear to be a drive by cleanup (as in not directly related to this PR) -- that is fine I am just confirming my understanding



##########
datafusion/core/tests/window_fuzz.rs:
##########
@@ -227,9 +227,7 @@ fn get_random_window_frame(rng: &mut StdRng) -> WindowFrame {
     } else if rand_num < 2 {
         WindowFrameUnits::Rows
     } else {
-        // For now we do not support GROUPS in BoundedWindowAggExec implementation
-        // TODO: once GROUPS handling is available, use WindowFrameUnits::GROUPS in randomized tests also.
-        WindowFrameUnits::Range
+        WindowFrameUnits::Groups

Review Comment:
   👍 



##########
datafusion/physical-expr/src/window/partition_evaluator.rs:
##########
@@ -38,9 +38,15 @@ pub trait PartitionEvaluator: Debug + Send {
         Ok(BuiltinWindowState::Default)
     }
 
+    /// Updates the internal state for Built-in window function

Review Comment:
   thank you for adding these comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5587: Do not break pipeline for window queries with GROUPS

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5587:
URL: https://github.com/apache/arrow-datafusion/pull/5587#issuecomment-1472179746

   I plan to review this later today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #5587: Do not break pipeline for window queries with GROUPS

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #5587:
URL: https://github.com/apache/arrow-datafusion/pull/5587#discussion_r1139268375


##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -548,9 +548,9 @@ impl SortedPartitionByBoundedWindowStream {
             for (partition_row, WindowState { state: value, .. }) in window_agg_state {
                 let n_prune =
                     min(value.window_frame_range.start, value.last_calculated_index);
-                if let Some(state) = n_prune_each_partition.get_mut(partition_row) {
-                    if n_prune < *state {
-                        *state = n_prune;
+                if let Some(current) = n_prune_each_partition.get_mut(partition_row) {

Review Comment:
   Right, this is a drive-by cleanup. There were too many `state`s going around making it hard to read 🙂 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org