You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/04/17 16:00:28 UTC

[GitHub] [arrow-datafusion] mustafasrepo opened a new pull request, #6034: Do not break pipeline in aggregation if group by columns are ordered

mustafasrepo opened a new pull request, #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes [#5133](https://github.com/apache/arrow-datafusion/issues/5133).
   
   # Rationale for this change
   A discussed in the [document](https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit?usp=sharing). If some of the expressions in the `GROUP BY` clause is already ordered. We can generate aggregator results without breaking the pipeline. Consider the query below
   ```sql
   SELECT a, b,
      SUM(c) as summation1
      FROM annotated_data
      GROUP BY a, b
   ```
   If source is ordered by `a, b`.  We can calculate group by values in streaming fashion (Corresponds to `Fully Streaming` in the document). When the value of `a, b` columns change, it means that corresponding group will not receive any more values(Otherwise it would contradict with ordering). 
   
   If the source were ordered by `a`. We can calculate group by values in streaming fashion also ((Corresponds to `Partial Streaming` in the document)). In this case, However, results would be generated when the value of column `a` change.
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   This PR enables us to produce non-pipeline breaking results for the conditions `Fully Streaming` and `Partial Streaming`. Please see the [document](https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit?usp=sharing) for more detailed discussion and what these terms refer. 
   
   Since the behavior of the executor changes with existing ordering (If result can be calculated in streaming fashion output will have an ordering. Otherwise output will not have an ordering). This functionality requires us to calculate `output_ordering` dynamically. For this reason, this PR accompanies the api change from `fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>` to `fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>>`. To support dynamic calculation of the output ordering.  See the corresponding [PR](https://github.com/synnada-ai/arrow-datafusion/pull/78) for more information.
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   Yes `aggregate_fuzz.rs` file contains random test to check for whether streamed results and existing version produces same result. 
   
   Also when run on benchmarks I have verified that there is no regression. Benchmark results can be found below
   
   | Month | Savings | Savings | Savings |
   | -------- | ------- | ------- | ------- |
   | QQuery 1     |    2528.72ms |   2476.56ms | no change |
   | QQuery 2     |     430.39ms |     428.26ms | no change |
   | QQuery 3     |    1808.89ms |    1808.69ms | no change |
   | QQuery 4     |    1475.49ms |    1472.43ms | no change |
   | QQuery 5     |    1826.64ms |    1829.95ms | no change |
   | QQuery 6     |    1599.18ms |    1587.20ms | no change |
   | QQuery 7     |    2123.82ms |    2119.41ms | no change |
   | QQuery 8     |    1829.00ms |    1820.43ms | no change |
   | QQuery 9     |    2229.39ms |    2193.94ms | no change |
   | QQuery 10    |    1827.89ms |    1797.32ms | no change |
   | QQuery 11    |     416.38ms |     417.49ms | no change |
   | QQuery 12    |    1655.84ms |    1628.03ms | no change |
   | QQuery 13    |     673.42ms |     686.45ms | no change |
   | QQuery 14    |    1550.93ms |    1518.83ms | no change |
   | QQuery 15    |    3061.58ms |    2999.56ms | no change |
   | QQuery 16    |     229.33ms |     237.00ms | no change |
   | QQuery 17    |    5935.75ms |    5963.68ms | no change |
   | QQuery 18    |    4655.64ms |    4653.72ms | no change |
   | QQuery 19    |    1829.92ms |    1794.32ms | no change |
   | QQuery 20    |    2137.18ms |    2129.47ms | no change |
   | QQuery 21    |    4349.41ms |    4234.84ms | no change |
   | QQuery 22    |     355.48ms |     352.52ms | no change |
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   `api change`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1523421786

   I have implemented version 2. You can find it [here](https://github.com/apache/arrow-datafusion/pull/6124). I have measured its performance, result can be found below.
   | Query | main | branch | Change|
   |--------  |--------  | -------- |-------- |
   | QQuery 1     |    1894.27ms |    1930.49ms | no change |
   | QQuery 2     |     353.20ms |     350.90ms | no change |
   | QQuery 3     |    1494.18ms |    1518.88ms | no change |
   | QQuery 4     |    1458.10ms |    1487.40ms | no change |
   | QQuery 5     |    1535.74ms |    1553.12ms | no change |
   | QQuery 6     |    1193.48ms |    1219.73ms | no change |
   | QQuery 7     |    1840.05ms |    1854.09ms | no change |
   | QQuery 8     |    1548.89ms |    1558.78ms | no change |
   | QQuery 9     |    1817.28ms |    1837.66ms | no change |
   | QQuery 10    |    1500.78ms |    1524.82ms | no change |
   | QQuery 11    |     344.51ms |     347.98ms | no change |
   | QQuery 12    |    1648.40ms |    1678.82ms | no change |
   | QQuery 13    |     635.02ms |     644.33ms | no change |
   | QQuery 14    |    1249.13ms |    1277.28ms | no change |
   | QQuery 15    |    2403.47ms |    2449.88ms | no change |
   | QQuery 16    |     225.48ms |     227.18ms | no change |
   | QQuery 17    |    3598.06ms |    3628.50ms | no change |
   | QQuery 18    |    3406.65ms |    3458.96ms | no change |
   | QQuery 19    |    1423.90ms |    1457.02ms | no change |
   | QQuery 20    |    1567.27ms |    1592.57ms | no change |
   | QQuery 21    |    4310.73ms |    4371.10ms | no change |
   | QQuery 22    |     341.31ms |     348.51ms | no change |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1521511462

   Regarding the performance downgrade. I have examined the code to see where performance downgrade occurs. In new implementation we extend `GroupState`, to be able to determine whether a group is prunable or not, and to prune it safely.
   Specifically, we were adding `ordered_columns: Vec<ScalarValue>`, to store ordered section of the group(When this change group is finalized, and can be pruned). This API required us to create empty Vector for each group (even if they are not prunable in theory. This may downgrade performance in high cardinality cases.) I have changed the API of this member from `ordered_columns: Vec<ScalarValue>` to  `ordered_columns: Option<Vec<ScalarValue>>` to prevent unnecessary empty vector creation. Below are my test results.
   
   | Query | main | branch | Change|
   |--------  |--------  | -------- |-------- |
   | QQuery 1     |    1887.95ms |    1896.02ms | no change |
   | QQuery 2     |     347.76ms |     349.14ms | no change |
   | QQuery 3     |    1473.75ms |    1475.31ms | no change |
   | QQuery 4     |    1442.71ms |    1436.79ms | no change |
   | QQuery 5     |    1497.76ms |    1501.52ms | no change |
   | QQuery 6     |    1171.16ms |    1174.17ms | no change |
   | QQuery 7     |    1801.61ms |    1801.15ms | no change |
   | QQuery 8     |    1500.70ms |    1503.70ms | no change |
   | QQuery 9     |    1771.67ms |    1771.04ms | no change |
   | QQuery 10    |    1464.79ms |    1467.14ms | no change |
   | QQuery 11    |     348.81ms |     349.58ms | no change |
   | QQuery 12    |    1623.71ms |    1613.43ms | no change |
   | QQuery 13    |     640.64ms |     643.67ms | no change |
   | QQuery 14    |    1225.25ms |    1228.61ms | no change |
   | QQuery 15    |    2350.93ms |    2362.08ms | no change |
   | QQuery 16    |     224.96ms |     230.69ms | no change |
   | QQuery 17    |    3580.82ms |    3584.29ms | no change |
   | QQuery 18    |    3396.31ms |    3425.83ms | no change |
   | QQuery 19    |    1401.42ms |    1405.69ms | no change |
   | QQuery 20    |    1543.74ms |    1576.21ms | no change |
   | QQuery 21    |    4267.18ms |    4258.72ms | no change |
   | QQuery 22    |     341.99ms |     343.75ms | no change |
   
   However, I cannot say definitely this has fixed the problem. 
   I think we have 3 options
   1. Extend `GroupedHashAggregateStream` to support streaming aggregation (current approach).
   2. Create a new kind of `Stream` for streaming aggregation (use it from `AggregateExec`).
   3. Create a new kind of `Executor` such `StreamingAggregateExec` for streaming use cases.
   
   My judgement is as follows for each case
   Pros of first approach:
   - Introduces least amount of change, utilizes a lot of common code.
   
   Cons of first approach
   - Possibly downgrade performance
   - Extends state with members that are not used in all cases
   
   Pros of 2nd approach
   - No performance penalty
   - Clear state
   - We can still produce non-pipeline breaking results given that there is an existing ordering for group by expressions
   
   Pros of 3rd approach
   - No performance penalty
   - Clear state
   
   Cons of 3rd approach
   - We can only produce non-pipeline breaking results when source is marked as unbounded. (For bounded cases even if there is an existing ordering that can produce non-pipeline breaking result; without an additional rule to change executor, we cannot produce non-pipeline breaking results.)
   
   We can pursue any one of the above approaches. If community has a preference we can pursue that approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1520485851

   For reference, here is the same benchmark run against `main` itself: 
   
   ```
   ****** TPCH SF1 (Parquet) ******
   + python3 /home/alamb/arrow-datafusion/benchmarks/compare.py /home/alamb/benchmarking/alamb-main/tpch_sf1_parquet_main.json /home/alamb/benchmarking/alamb-main/tpch_sf1_parquet\
   _branch.json
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃ /home/alamb… ┃ /home/alamb… ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ QQuery 1     │    1430.86ms │    1423.29ms │     no change │
   │ QQuery 2     │     399.75ms │     405.00ms │     no change │
   │ QQuery 3     │     520.40ms │     525.56ms │     no change │
   │ QQuery 4     │     218.29ms │     223.87ms │     no change │
   │ QQuery 5     │     693.57ms │     685.46ms │     no change │
   │ QQuery 6     │     416.62ms │     423.02ms │     no change │
   │ QQuery 7     │    1258.17ms │    1243.79ms │     no change │
   │ QQuery 8     │     690.25ms │     687.29ms │     no change │
   │ QQuery 9     │    1304.02ms │    1288.01ms │     no change │
   │ QQuery 10    │     770.91ms │     748.94ms │     no change │
   │ QQuery 11    │     356.32ms │     336.55ms │ +1.06x faster │
   │ QQuery 12    │     335.14ms │     329.12ms │     no change │
   │ QQuery 13    │    1170.83ms │    1146.78ms │     no change │
   │ QQuery 14    │     422.25ms │     421.47ms │     no change │
   │ QQuery 15    │     391.14ms │     381.71ms │     no change │
   │ QQuery 16    │     348.38ms │     344.13ms │     no change │
   │ QQuery 17    │    2860.96ms │    2838.27ms │     no change │
   │ QQuery 18    │    3726.11ms │    3734.67ms │     no change │
   │ QQuery 19    │     728.53ms │     737.35ms │     no change │
   │ QQuery 20    │    1250.75ms │    1208.06ms │     no change │
   │ QQuery 21    │    1688.40ms │    1757.45ms │     no change │
   │ QQuery 22    │     192.36ms │     190.43ms │     no change │
   └──────────────┴──────────────┴──────────────┴───────────────┘
   + echo '****** TPCH SF1 (mem) ******'
   ****** TPCH SF1 (mem) ******
   + python3 /home/alamb/arrow-datafusion/benchmarks/compare.py /home/alamb/benchmarking/alamb-main/tpch_sf1_mem_main.json /home/alamb/benchmarking/alamb-main/tpch_sf1_mem_branch.\
   json
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃           -o ┃           -o ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ QQuery 1     │     759.07ms │     770.73ms │     no change │
   │ QQuery 2     │     269.81ms │     291.05ms │  1.08x slower │
   │ QQuery 3     │     180.67ms │     161.61ms │ +1.12x faster │
   │ QQuery 4     │     105.16ms │     105.46ms │     no change │
   │ QQuery 5     │     467.46ms │     466.11ms │     no change │
   │ QQuery 6     │      38.08ms │      42.72ms │  1.12x slower │
   │ QQuery 7     │    1170.05ms │    1147.43ms │     no change │
   │ QQuery 8     │     249.06ms │     238.74ms │     no change │
   │ QQuery 9     │     613.38ms │     609.98ms │     no change │
   │ QQuery 10    │     342.67ms │     327.23ms │     no change │
   │ QQuery 11    │     279.84ms │     281.69ms │     no change │
   │ QQuery 12    │     143.94ms │     146.57ms │     no change │
   │ QQuery 13    │     676.22ms │     668.79ms │     no change │
   │ QQuery 14    │      53.06ms │      51.73ms │     no change │
   │ QQuery 15    │      98.47ms │      92.68ms │ +1.06x faster │
   │ QQuery 16    │     244.93ms │     257.73ms │  1.05x slower │
   │ QQuery 17    │    2473.33ms │    2503.47ms │     no change │
   │ QQuery 18    │    3150.26ms │    3169.32ms │     no change │
   │ QQuery 19    │     154.90ms │     150.53ms │     no change │
   │ QQuery 20    │     969.12ms │     929.69ms │     no change │
   │ QQuery 21    │    1476.10ms │    1457.34ms │     no change │
   │ QQuery 22    │     148.71ms │     143.20ms │     no change │
   └──────────────┴──────────────┴──────────────┴───────────────┘
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1521324807

   @ozankabak @mustafasrepo 
   
   I strongly suggest to have separate implementation(Exec) for Streaming Aggregation. This is similar to how we separate the `HashJoinExec` /`SortMergeJoinExec` and `UnionExec` /`InterleaveExec`.
   With the split of physical plans, The physical plans will deliver clear informations about what kind real physical operators they are composed of.
   With the split of physical plans, we can keep each operator's code base (HashAggregation and SortAggregation) relatively simple. 
   We can further keep a relatively lightweight grouping state for each operators. The memory layout of the grouping state is critical for performance.  For the hash aggregation performance, currently, we still have huge gaps compared with DuckDb.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1176177564


##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -217,7 +217,7 @@ impl ExecutionPlan for CustomExec {
         datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+    fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {

Review Comment:
   We aggreed on, this is unnecessary. I have retracted to old API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1522548414

   > I'm not very familiar with the group by code anymore, but the use of per-group allocations does immediately stand out to me as at risk of thrashing the memory allocator, and consequently making the code not just slow but also wildly unpredictable. I'm aware this isn't something introduced by this PR, but revisiting this design (see https://github.com/apache/arrow-datafusion/issues/4973) may make it easier to make changes in this area without introducing regressions.
   
   I agree, we will be happy to take part in that effort.
   
   Going back to the scope of this PR: If everyone agrees, we will generate another changeset with Approach 2, and @alamb can check how the performance looks there (since our local benchmarks do not show a difference even now).
   
   We can then move forward with either that, or this version. Sounds good?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1176203940


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -591,38 +787,93 @@ impl GroupedHashAggregateStream {
                 }
                 let batch_indices = batch_indices.finish();
 
-                let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
-                let normal_values =
-                    get_at_indices(&normal_aggr_input_values, &batch_indices)?;
                 let row_filter_values =
                     get_optional_filters(&row_filter_values, &batch_indices);
                 let normal_filter_values =
                     get_optional_filters(&normal_filter_values, &batch_indices);
-                self.update_accumulators_using_batch(
-                    &groups_with_rows,
-                    &offsets,
-                    &row_values,
-                    &normal_values,
-                    &row_filter_values,
-                    &normal_filter_values,
-                    &mut allocated,
-                )?;
+                if self
+                    .aggregation_ordering
+                    .as_ref()
+                    .map_or(false, |s| s.mode == GroupByOrderMode::Ordered)
+                {
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_aggr_input_values,
+                        &normal_aggr_input_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                } else {
+                    let row_values =
+                        get_at_indices(&row_aggr_input_values, &batch_indices)?;
+                    let normal_values =
+                        get_at_indices(&normal_aggr_input_values, &batch_indices)?;
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_values,
+                        &normal_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                };
             }
         }
         allocated += self
             .row_converter
             .size()
             .saturating_sub(row_converter_size_pre);
+
+        if self.aggregation_ordering.is_some() {
+            let mut new_result = false;
+            let last_ordered_columns = self
+                .aggr_state
+                .group_states
+                .last()
+                .map(|item| item.ordered_columns.clone());
+
+            if let Some(last_ordered_columns) = last_ordered_columns {
+                for cur_group in &mut self.aggr_state.group_states {
+                    if cur_group.ordered_columns != last_ordered_columns {
+                        // We will no longer receive value. Set status to GroupStatus::CanEmit
+                        // meaning we can generate result for this group.
+                        cur_group.status = GroupStatus::CanEmit;
+                        new_result = true;
+                    }
+                }
+            }
+            if new_result {
+                self.exec_state = ExecutionState::ProducingOutput;
+            }
+        }
+
         Ok(allocated)
     }
 }
 
+#[derive(Debug, PartialEq)]
+enum GroupStatus {
+    // `GroupProgress` means data for current group is not complete. New data may arrive.
+    GroupProgress,
+    // `CanEmit` means data for current group is completed. And its result can emitted.
+    CanEmit,
+    // Emitted means that result for the groups is outputted. Group can be pruned from state.
+    Emitted,
+}
+
 /// The state that is built for each output group.
 #[derive(Debug)]
 pub struct GroupState {
     /// The actual group by values, stored sequentially
     group_by_values: OwnedRow,
 
+    ordered_columns: Option<Vec<ScalarValue>>,

Review Comment:
   I think this is not efficient. We should avoid using Vec of Vec structs in the critical data structs. The Vec itself is actually a pointer which will point to some other memory address.
   Because the GroupState is hold in a global Vec, if we store `ordered_columns` in another Vec, when the code access those member, the memory access pattern will be very random.
   
   You can implement something similar to arrow's `GenericStringArray` to achieve a better memory layout.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1176301909


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -663,12 +914,32 @@ impl std::fmt::Debug for AggregationState {
 }
 
 impl GroupedHashAggregateStream {
+    /// Prune the groups from the `self.aggr_state.group_states` which are in
+    /// `GroupStatus::Emitted`(this status means that result of this group emitted/outputted already, and
+    /// we are sure that these groups cannot receive new rows.) status.
+    fn prune(&mut self) {

Review Comment:
   `ordered_columns` store the section of the group by expression that defines ordering in `GroupState`. When a different `ordered_columns` is received, we are sure that previous groups with different `ordered_columns` are finalized (They will no longer receive new value). At the end of `group_aggregate_batch` we iterate over `self.aggr_state.group_states` and mark the groups that have different `ordered_columns` with the `ordered_columns` of the most recent (last) group as prunable.
   
   As an example, If the table is like below, and we know that it satisfies `ORDER BY a ASC`
   |a|
   |----|
   |1|
   |1|
   |2|
   |2|
   |3|
   |3|
   
   and group by clause is `GROUP BY a` group with `ordered_columns= Some(vec![1])` and `ordered_columns= Some(vec![2])` will be pruned. Since they are different than `ordered_columns= Some(vec![3])`. However, last group is not pruned because we still can receive values with 3 for column a



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1519828816

   > Should we create a different group stream instead of adding new methods to `GroupedHashAggregateStream` ?
   
   Since most of the code is common with existing `GroupedHashAggregateStream`, we chose this implementation. Otherwise we would need to duplicate a lot of code. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1519648269

   Should we create a different group stream instead of adding new methods to `GroupedHashAggregateStream` ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1176147142


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -313,22 +331,144 @@ impl RecordBatchStream for GroupedHashAggregateStream {
     }
 }
 
+/// This utility object encapsulates the row object, the hash and the group
+/// indices for a group. This information is used when executing streaming
+/// GROUP BY calculations.
+struct GroupOrderInfo {
+    owned_row: OwnedRow,
+    hash: u64,
+    range: Range<usize>,
+}
+
 impl GroupedHashAggregateStream {
-    // Update the row_aggr_state according to groub_by values (result of group_by_expressions)
+    // Update the aggr_state according to groub_by values (result of group_by_expressions) when group by

Review Comment:
   Fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1518603490

   FYI @yjshen @Dandandan @yahoNanJing  and @mingmwang  who seem to have been working on aggregation performance recently -- this PR adds streaming aggregation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1522274324

   I am running the benchmarks again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1523022386

   > 
   
   
   
   > > Create a new kind of Stream for streaming aggregation (use it from AggregateExec).
   > 
   > This is what makes sense to me, though I am not sure what @mingmwang thinks
   
   Yes, I am OK with this Option 2 as the following ticket.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1174949244


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -822,13 +930,127 @@ mod tests {
     use std::task::{Context, Poll};
 
     use super::StreamType;
+    use crate::physical_plan::aggregates::GroupByOrderMode::{Ordered, PartiallyOrdered};
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::physical_plan::{
         ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
         Statistics,
     };
     use crate::prelude::SessionContext;
 
+    // Generate a schema which consists of 5 columns (a, b, c, d, e)
+    fn create_test_schema() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, true);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, true);
+        let e = Field::new("e", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+
+        Ok(schema)
+    }
+
+    // Created a sorted parquet exec
+    fn csv_exec_sorted(

Review Comment:
   Removed it. Thanks for pointing it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1522449963

   > to prevent unnecessary empty vector creation
   
   FWIW an empty vector just contains a `NonNull::dangling`, it doesn't allocate anything, so wrapping it in an `Option` is redundant. That being said the additions to `GroupState` will make it non-trivially larger, which could conceivably cause performance regressions.
   
   I'm not very familiar with the group by code anymore, but the use of per-group allocations does immediately stand out to me as at risk of thrashing the memory allocator, and consequently making the code not just slow but also wildly unpredictable. I'm aware this isn't something introduced by this PR, but revisiting this design (see #4973) may make it easier to make changes in this area without introducing regressions.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #6034: Do not break pipeline in aggregation if group by columns are ordered

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1513688603

   @xiaoyong-z @Ted-Jiang it'd be great if you can take a look -- this is a feature you are interested in AFAICT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1522342001

   @tustvold  do you have any thoughts about why this code / PR could be making the grouping operator seemingly slow down?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1520193728

   I ran some preliminary benchmarks against this branch and it seems like some queries have gotten slightly slower:
   
   ```
   alamb@aal-dev:~/benchmarking/feature%2Fstream_groupby4$ python3 ~/arrow-datafusion/benchmarks/compare.py tpch_sf1_parquet_mem.json tpch_sf1_mem_branch.json
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
   ┃ Query        ┃           -o ┃           -o ┃       Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
   │ QQuery 1     │     770.75ms │     760.05ms │    no change │
   │ QQuery 2     │     289.80ms │     312.71ms │ 1.08x slower │
   │ QQuery 3     │     174.31ms │     175.09ms │    no change │
   │ QQuery 4     │     106.65ms │     104.51ms │    no change │
   │ QQuery 5     │     477.41ms │     480.83ms │    no change │
   │ QQuery 6     │      38.15ms │      37.78ms │    no change │
   │ QQuery 7     │    1071.70ms │    1082.32ms │    no change │
   │ QQuery 8     │     252.64ms │     264.53ms │    no change │
   │ QQuery 9     │     581.89ms │     598.15ms │    no change │
   │ QQuery 10    │     332.62ms │     339.50ms │    no change │
   │ QQuery 11    │     282.02ms │     291.65ms │    no change │
   │ QQuery 12    │     145.87ms │     152.48ms │    no change │
   │ QQuery 13    │     679.94ms │     680.18ms │    no change │
   │ QQuery 14    │      59.35ms │      58.90ms │    no change │
   │ QQuery 15    │      96.58ms │      96.56ms │    no change │
   │ QQuery 16    │     251.37ms │     266.31ms │ 1.06x slower │
   │ QQuery 17    │    2435.04ms │    2539.73ms │    no change │
   │ QQuery 18    │    3021.24ms │    3272.84ms │ 1.08x slower │
   │ QQuery 19    │     142.99ms │     153.61ms │ 1.07x slower │
   │ QQuery 20    │     925.24ms │    1058.29ms │ 1.14x slower │
   │ QQuery 21    │    1423.51ms │    1407.18ms │    no change │
   │ QQuery 22    │     148.12ms │     144.86ms │    no change │
   └──────────────┴──────────────┴──────────────┴──────────────┘
   alamb@aal-dev:~/benchmarking/feature%2Fstream_groupby4$ python3 ~/arrow-datafusion/benchmarks/compare.py tpch_sf1_parquet_main.json  tpch_sf1_parquet_branch.json
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
   ┃ Query        ┃ /home/alamb… ┃ /home/alamb… ┃       Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
   │ QQuery 1     │    1470.88ms │    1456.73ms │    no change │
   │ QQuery 2     │     394.00ms │     422.56ms │ 1.07x slower │
   │ QQuery 3     │     564.83ms │     540.62ms │    no change │
   │ QQuery 4     │     222.21ms │     221.49ms │    no change │
   │ QQuery 5     │     717.36ms │     702.32ms │    no change │
   │ QQuery 6     │     460.41ms │     454.66ms │    no change │
   │ QQuery 7     │    1216.67ms │    1230.08ms │    no change │
   │ QQuery 8     │     717.35ms │     731.94ms │    no change │
   │ QQuery 9     │    1337.85ms │    1326.94ms │    no change │
   │ QQuery 10    │     765.05ms │     787.99ms │    no change │
   │ QQuery 11    │     337.95ms │     344.80ms │    no change │
   │ QQuery 12    │     329.03ms │     327.28ms │    no change │
   │ QQuery 13    │    1105.33ms │    1170.87ms │ 1.06x slower │
   │ QQuery 14    │     449.61ms │     450.68ms │    no change │
   │ QQuery 15    │     405.36ms │     417.38ms │    no change │
   │ QQuery 16    │     330.18ms │     349.17ms │ 1.06x slower │
   │ QQuery 17    │    2772.98ms │    2891.72ms │    no change │
   │ QQuery 18    │    3592.01ms │    3802.18ms │ 1.06x slower │
   │ QQuery 19    │     769.32ms │     771.99ms │    no change │
   │ QQuery 20    │    1237.75ms │    1326.82ms │ 1.07x slower │
   │ QQuery 21    │    1663.89ms │    1633.89ms │    no change │
   │ QQuery 22    │     197.52ms │     202.74ms │    no change │
   └──────────────┴──────────────┴──────────────┴──────────────┘
   ```
   
   Script I used is here: https://github.com/alamb/datafusion-benchmarking/blob/628151e3e3d27ff6e5242052d017f71dcd0d80ef/bench.sh
   
   I am rerunning the numbers to see if I can reproduce the results


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1176250150


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -663,12 +914,32 @@ impl std::fmt::Debug for AggregationState {
 }
 
 impl GroupedHashAggregateStream {
+    /// Prune the groups from the `self.aggr_state.group_states` which are in
+    /// `GroupStatus::Emitted`(this status means that result of this group emitted/outputted already, and
+    /// we are sure that these groups cannot receive new rows.) status.
+    fn prune(&mut self) {

Review Comment:
   I think this is good to keep the global `group_states` in a relatively small size. 
   But it is possible that in some cases only a small percentage is pruned, and we pay for more 
   copies.   I'm not sure when the prune will be triggered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1176250150


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -663,12 +914,32 @@ impl std::fmt::Debug for AggregationState {
 }
 
 impl GroupedHashAggregateStream {
+    /// Prune the groups from the `self.aggr_state.group_states` which are in
+    /// `GroupStatus::Emitted`(this status means that result of this group emitted/outputted already, and
+    /// we are sure that these groups cannot receive new rows.) status.
+    fn prune(&mut self) {

Review Comment:
   I think this is good to keep the global `group_states` in a relatively small size. 
   But is it possible that in some cases only a small percentage is pruned?  And we pay for more copies.   I'm not sure when the prune will be triggered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #6034: Do not break pipeline in aggregation if group by columns are ordered

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1511732797

   I am super excited about this! This one and its sister window PR (#6036) complete the two remaining major milestones for achieving streaming execution on amenable datasets. After these, the remaining changes will be smaller changes that polish and improve the foundation these PRs provide.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1522341230

   ```
   + echo '****** TPCH SF1 (Parquet) ******'
   ****** TPCH SF1 (Parquet) ******
   + python3 /home/alamb/arrow-datafusion/benchmarks/compare.py /home/alamb/benchmarking/feature%2Fstream_groupby4/tpch_sf1_parquet_main.json /home/alamb/benchmarking/feature%2Fstream_groupby4/tpch_sf1_parqu\
   et_branch.json
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
   ┃ Query        ┃ /home/alamb… ┃ /home/alamb… ┃       Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
   │ QQuery 1     │    1440.81ms │    1466.22ms │    no change │
   │ QQuery 2     │     382.39ms │     404.11ms │ 1.06x slower │
   │ QQuery 3     │     543.05ms │     552.98ms │    no change │
   │ QQuery 4     │     231.01ms │     220.95ms │    no change │
   │ QQuery 5     │     693.54ms │     687.44ms │    no change │
   │ QQuery 6     │     426.04ms │     449.61ms │ 1.06x slower │
   │ QQuery 7     │    1185.19ms │    1188.84ms │    no change │
   │ QQuery 8     │     702.14ms │     709.04ms │    no change │
   │ QQuery 9     │    1300.47ms │    1316.84ms │    no change │
   │ QQuery 10    │     773.33ms │     785.76ms │    no change │
   │ QQuery 11    │     332.28ms │     353.08ms │ 1.06x slower │
   │ QQuery 12    │     330.60ms │     321.46ms │    no change │
   │ QQuery 13    │    1088.95ms │    1150.66ms │ 1.06x slower │
   │ QQuery 14    │     426.84ms │     438.99ms │    no change │
   │ QQuery 15    │     390.98ms │     400.00ms │    no change │
   │ QQuery 16    │     328.41ms │     351.25ms │ 1.07x slower │
   │ QQuery 17    │    2761.33ms │    2798.95ms │    no change │
   │ QQuery 18    │    3650.22ms │    3674.77ms │    no change │
   │ QQuery 19    │     724.28ms │     776.02ms │ 1.07x slower │
   │ QQuery 20    │    1214.26ms │    1322.29ms │ 1.09x slower │
   │ QQuery 21    │    1651.66ms │    1685.36ms │    no change │
   │ QQuery 22    │     196.26ms │     196.06ms │    no change │
   └──────────────┴──────────────┴──────────────┴──────────────┘
   + echo '****** TPCH SF1 (mem) ******'
   ****** TPCH SF1 (mem) ******
   + python3 /home/alamb/arrow-datafusion/benchmarks/compare.py /home/alamb/benchmarking/feature%2Fstream_groupby4/tpch_sf1_mem_main.json /home/alamb/benchmarking/feature%2Fstream_groupby4/tpch_sf1_mem_branc\
   h.json
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃           -o ┃           -o ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ QQuery 1     │     758.37ms │     778.09ms │     no change │
   │ QQuery 2     │     277.40ms │     309.41ms │  1.12x slower │
   │ QQuery 3     │     173.75ms │     188.35ms │  1.08x slower │
   │ QQuery 4     │     113.69ms │     110.27ms │     no change │
   │ QQuery 5     │     475.95ms │     461.74ms │     no change │
   │ QQuery 6     │      36.47ms │      37.38ms │     no change │
   │ QQuery 7     │    1083.01ms │    1063.32ms │     no change │
   │ QQuery 8     │     259.31ms │     246.75ms │     no change │
   │ QQuery 9     │     624.32ms │     575.27ms │ +1.09x faster │
   │ QQuery 10    │     310.52ms │     351.58ms │  1.13x slower │
   │ QQuery 11    │     284.14ms │     282.22ms │     no change │
   │ QQuery 12    │     148.13ms │     145.27ms │     no change │
   │ QQuery 13    │     659.01ms │     719.22ms │  1.09x slower │
   │ QQuery 14    │      52.88ms │      48.78ms │ +1.08x faster │
   │ QQuery 15    │      90.71ms │     103.68ms │  1.14x slower │
   │ QQuery 16    │     233.70ms │     258.77ms │  1.11x slower │
   │ QQuery 17    │    2403.54ms │    2550.52ms │  1.06x slower │
   │ QQuery 18    │    2964.21ms │    3217.22ms │  1.09x slower │
   │ QQuery 19    │     139.10ms │     149.13ms │  1.07x slower │
   │ QQuery 20    │     930.97ms │    1042.61ms │  1.12x slower │
   │ QQuery 21    │    1399.42ms │    1408.19ms │     no change │
   │ QQuery 22    │     140.09ms │     138.46ms │     no change │
   └──────────────┴──────────────┴──────────────┴───────────────┘
   ```
   
   I am running with https://github.com/alamb/datafusion-benchmarking/blob/87ee101b70b15dd4529f124d65189b0fb87e09b7/bench.sh
   
   Running on a gcp machine `e2-standard-8`:
   
   ```shell
   cat /proc/cpuinfo
   ...
   
   processor       : 7
   vendor_id       : GenuineIntel
   cpu family      : 6
   model           : 79
   model name      : Intel(R) Xeon(R) CPU @ 2.20GHz
   stepping        : 0
   microcode       : 0xffffffff
   cpu MHz         : 2200.164
   cache size      : 56320 KB
   physical id     : 0
   siblings        : 8
   core id         : 3
   cpu cores       : 4
   apicid          : 7
   initial apicid  : 7
   fpu             : yes
   fpu_exception   : yes
   cpuid level     : 13
   wp              : yes
   flags           : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpui\
   d tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single pti ssbd ibrs ibpb stibp fsgsbase tsc_adj\
   ust bmi1 hle avx2 smep bmi2 erms invpcid rtm rdseed adx smap xsaveopt arat md_clear arch_capabilities
   bugs            : cpu_meltdown spectre_v1 spectre_v2 spec_store_bypass l1tf mds swapgs taa mmio_stale_data retbleed
   bogomips        : 4400.32
   clflush size    : 64
   cache_alignment : 64
   address sizes   : 46 bits physical, 48 bits virtual
   power management:
   ```
   
   I am hoping to make the benchmarks easier to run / reproduce. I also plan to take another close look at this PR tomorrow
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1522343309

   > Create a new kind of Stream for streaming aggregation (use it from AggregateExec).
   
   This is what makes sense to me, though I am not sure what @mingmwang thinks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1519084472

   I am running some benchmarks on this branch and will report back


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1175082424


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -72,6 +74,15 @@ pub enum AggregateMode {
     Single,
 }
 
+/// Group By expression modes
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum GroupByOrderMode {
+    /// Some of the expressions in the GROUP BY clause have an ordering.
+    PartiallyOrdered,

Review Comment:
   Algorithm for finding different group boundaries are different for the `Ordered` and `PartiallyOrdered` cases. We could have used a flag instead of this struct. However, I think this is more explicit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #6034: Do not break pipeline in aggregation if group by columns are ordered

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1511727584

   CI failure seems to be an intermittent failure, needs a re-run


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1521955995

   @alamb, can you try measuring again on your end? I wonder if you will find a similar result with @mustafasrepo after his last change. If you also see no (or very little) performance change, I propose we get the ball rolling by merging this. We can always refactor the code to the 2nd approach with a follow-on PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1176250150


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -663,12 +914,32 @@ impl std::fmt::Debug for AggregationState {
 }
 
 impl GroupedHashAggregateStream {
+    /// Prune the groups from the `self.aggr_state.group_states` which are in
+    /// `GroupStatus::Emitted`(this status means that result of this group emitted/outputted already, and
+    /// we are sure that these groups cannot receive new rows.) status.
+    fn prune(&mut self) {

Review Comment:
   I think this is good to keep the global `group_states` in a relatively small size. 
   But it is possible that in some cases only a small percentage is pruned, and we pay for more copies.   I'm not sure when the prune will be triggered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1175324013


##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -217,7 +217,7 @@ impl ExecutionPlan for CustomExec {
         datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+    fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {

Review Comment:
   Why does this signature need to change? The new signature requires an allocation / `clone` of a Vec where the previous one didn't and thus this seems to change the API for the worse.
   
   If it is to support a calculated output in the grouping perhaps we can calculate the output ordering once in the constructor rather than on demand.
   
   ```
   
       fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {
           self.calc_aggregation_ordering().map(|state| state.ordering)
       }
   ```



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -591,38 +785,93 @@ impl GroupedHashAggregateStream {
                 }
                 let batch_indices = batch_indices.finish();
 
-                let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
-                let normal_values =
-                    get_at_indices(&normal_aggr_input_values, &batch_indices)?;
                 let row_filter_values =
                     get_optional_filters(&row_filter_values, &batch_indices);
                 let normal_filter_values =
                     get_optional_filters(&normal_filter_values, &batch_indices);
-                self.update_accumulators_using_batch(
-                    &groups_with_rows,
-                    &offsets,
-                    &row_values,
-                    &normal_values,
-                    &row_filter_values,
-                    &normal_filter_values,
-                    &mut allocated,
-                )?;
+                if self
+                    .aggregation_ordering
+                    .as_ref()
+                    .map_or(false, |s| s.mode == GroupByOrderMode::Ordered)
+                {
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_aggr_input_values,
+                        &normal_aggr_input_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                } else {
+                    let row_values =
+                        get_at_indices(&row_aggr_input_values, &batch_indices)?;
+                    let normal_values =
+                        get_at_indices(&normal_aggr_input_values, &batch_indices)?;
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_values,
+                        &normal_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                };
             }
         }
         allocated += self
             .row_converter
             .size()
             .saturating_sub(row_converter_size_pre);
+
+        if self.aggregation_ordering.is_some() {
+            let mut new_result = false;
+            let last_ordered_columns = self
+                .aggr_state
+                .group_states
+                .last()
+                .map(|item| item.ordered_columns.clone());
+
+            if let Some(last_ordered_columns) = last_ordered_columns {

Review Comment:
   I may be mis understanding this code, but it seems like it is tracking per-group if the group can be emitted or not.  As I understand The `“Partial Streaming” / “Partitioned Streaming”` section of https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit#heading=h.uapxuhfa9wyi
   
   The entire hash table could be flushed each time a new value of date is seen:
   
   ![Screenshot 2023-04-24 at 10 08 15 AM](https://user-images.githubusercontent.com/490673/234021622-3d2eb19f-153c-4ba8-889c-5efd99c80424.png)
   
   Perhaps with the obvious vectorization of only checking on record batch boundaries, or something
   
   



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -313,22 +331,144 @@ impl RecordBatchStream for GroupedHashAggregateStream {
     }
 }
 
+/// This utility object encapsulates the row object, the hash and the group
+/// indices for a group. This information is used when executing streaming
+/// GROUP BY calculations.
+struct GroupOrderInfo {
+    owned_row: OwnedRow,
+    hash: u64,
+    range: Range<usize>,
+}
+
 impl GroupedHashAggregateStream {
-    // Update the row_aggr_state according to groub_by values (result of group_by_expressions)
+    // Update the aggr_state according to groub_by values (result of group_by_expressions) when group by

Review Comment:
   minor nit: `groub_by` --> `group_by`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1520256950

   I got similar results in my next performance run:
   ```
   ****** TPCH SF1 (Parquet) ******
   + python3 /home/alamb/arrow-datafusion/benchmarks/compare.py /home/alamb/benchmarking/feature%2Fstream_groupby4/tpch_sf1_parquet_main.json /home/alamb/benchmarking/feature%2Fstream_groupby4/tpch_sf1_parquet_branch.json
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
   ┃ Query        ┃ /home/alamb… ┃ /home/alamb… ┃       Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
   │ QQuery 1     │    1439.84ms │    1472.45ms │    no change │
   │ QQuery 2     │     408.59ms │     428.13ms │    no change │
   │ QQuery 3     │     541.64ms │     550.25ms │    no change │
   │ QQuery 4     │     218.62ms │     227.71ms │    no change │
   │ QQuery 5     │     690.76ms │     702.12ms │    no change │
   │ QQuery 6     │     447.31ms │     456.74ms │    no change │
   │ QQuery 7     │    1223.94ms │    1262.33ms │    no change │
   │ QQuery 8     │     707.30ms │     731.55ms │    no change │
   │ QQuery 9     │    1331.92ms │    1309.76ms │    no change │
   │ QQuery 10    │     791.17ms │     813.92ms │    no change │
   │ QQuery 11    │     338.22ms │     338.76ms │    no change │
   │ QQuery 12    │     330.78ms │     329.38ms │    no change │
   │ QQuery 13    │    1102.45ms │    1134.98ms │    no change │
   │ QQuery 14    │     450.43ms │     447.24ms │    no change │
   │ QQuery 15    │     401.69ms │     407.17ms │    no change │
   │ QQuery 16    │     335.41ms │     370.62ms │ 1.10x slower │
   │ QQuery 17    │    2793.22ms │    2984.28ms │ 1.07x slower │
   │ QQuery 18    │    3602.06ms │    3855.55ms │ 1.07x slower │
   │ QQuery 19    │     757.20ms │     772.06ms │    no change │
   │ QQuery 20    │    1208.05ms │    1409.62ms │ 1.17x slower │
   │ QQuery 21    │    1662.33ms │    1672.38ms │    no change │
   │ QQuery 22    │     191.45ms │     195.55ms │    no change │
   └──────────────┴──────────────┴──────────────┴──────────────┘
   + echo '****** TPCH SF1 (mem) ******'
   ****** TPCH SF1 (mem) ******
   + python3 /home/alamb/arrow-datafusion/benchmarks/compare.py /home/alamb/benchmarking/feature%2Fstream_groupby4/tpch_sf1_mem_main.json /home/alamb/benchmarking/feature%2Fstream_groupby4/tpch_sf1_mem_branch.json
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃           -o ┃           -o ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ QQuery 1     │     731.33ms │     759.80ms │     no change │
   │ QQuery 2     │     266.90ms │     315.18ms │  1.18x slower │
   │ QQuery 3     │     168.24ms │     181.07ms │  1.08x slower │
   │ QQuery 4     │     107.01ms │     107.07ms │     no change │
   │ QQuery 5     │     469.84ms │     472.69ms │     no change │
   │ QQuery 6     │      37.35ms │      38.33ms │     no change │
   │ QQuery 7     │    1119.56ms │    1121.01ms │     no change │
   │ QQuery 8     │     253.48ms │     258.02ms │     no change │
   │ QQuery 9     │     595.14ms │     604.48ms │     no change │
   │ QQuery 10    │     326.75ms │     343.41ms │  1.05x slower │
   │ QQuery 11    │     262.07ms │     293.92ms │  1.12x slower │
   │ QQuery 12    │     141.94ms │     151.47ms │  1.07x slower │
   │ QQuery 13    │     645.04ms │     698.31ms │  1.08x slower │
   │ QQuery 14    │      51.78ms │      48.35ms │ +1.07x faster │
   │ QQuery 15    │     100.32ms │     114.44ms │  1.14x slower │
   │ QQuery 16    │     240.47ms │     285.68ms │  1.19x slower │
   │ QQuery 17    │    2448.05ms │    2590.14ms │  1.06x slower │
   │ QQuery 18    │    3131.16ms │    3295.09ms │  1.05x slower │
   │ QQuery 19    │     147.72ms │     150.86ms │     no change │
   │ QQuery 20    │     931.54ms │    1052.21ms │  1.13x slower │
   │ QQuery 21    │    1454.51ms │    1438.62ms │     no change │
   │ QQuery 22    │     144.76ms │     141.84ms │     no change │
   └──────────────┴──────────────┴──────────────┴───────────────┘
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "yjshen (via GitHub)" <gi...@apache.org>.
yjshen commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1174509849


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -822,13 +930,127 @@ mod tests {
     use std::task::{Context, Poll};
 
     use super::StreamType;
+    use crate::physical_plan::aggregates::GroupByOrderMode::{Ordered, PartiallyOrdered};
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::physical_plan::{
         ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
         Statistics,
     };
     use crate::prelude::SessionContext;
 
+    // Generate a schema which consists of 5 columns (a, b, c, d, e)
+    fn create_test_schema() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, true);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, true);
+        let e = Field::new("e", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+
+        Ok(schema)
+    }
+
+    // Created a sorted parquet exec
+    fn csv_exec_sorted(

Review Comment:
   This could be removed, and there's an identical function crate::test::csv_exec_sorted



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -331,10 +408,32 @@ impl AggregateExec {
                     batch_size,
                     context,
                     partition,
+                    state,
                 )?,
             ))
         }
     }
+
+    fn calc_aggregate_state(&self) -> Option<AggregationOrdering> {

Review Comment:
   We have `AggregationState` in row_hash. The word `state` might be confusing in aggregate contexts.



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -663,12 +912,32 @@ impl std::fmt::Debug for AggregationState {
 }
 
 impl GroupedHashAggregateStream {
+    /// Prune the groups from the `self.aggr_state.group_states` which are in
+    /// `GroupStatus::Emitted`(this status means that result of this group emitted/outputted already, and
+    /// we are sure that these groups cannot receive new rows.) status.
+    fn prune(&mut self) {

Review Comment:
   This is smart!



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -591,38 +785,93 @@ impl GroupedHashAggregateStream {
                 }
                 let batch_indices = batch_indices.finish();
 
-                let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?;
-                let normal_values =
-                    get_at_indices(&normal_aggr_input_values, &batch_indices)?;
                 let row_filter_values =
                     get_optional_filters(&row_filter_values, &batch_indices);
                 let normal_filter_values =
                     get_optional_filters(&normal_filter_values, &batch_indices);
-                self.update_accumulators_using_batch(
-                    &groups_with_rows,
-                    &offsets,
-                    &row_values,
-                    &normal_values,
-                    &row_filter_values,
-                    &normal_filter_values,
-                    &mut allocated,
-                )?;
+                if self
+                    .state
+                    .as_ref()
+                    .map_or(false, |s| s.mode == GroupByOrderMode::Ordered)
+                {
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_aggr_input_values,
+                        &normal_aggr_input_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                } else {
+                    let row_values =
+                        get_at_indices(&row_aggr_input_values, &batch_indices)?;
+                    let normal_values =
+                        get_at_indices(&normal_aggr_input_values, &batch_indices)?;
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_values,
+                        &normal_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                };
             }
         }
         allocated += self
             .row_converter
             .size()
             .saturating_sub(row_converter_size_pre);
+
+        if self.state.is_some() {
+            let mut new_result = false;
+            let last_ordered_columns = self
+                .aggr_state
+                .group_states
+                .last()
+                .map(|item| item.ordered_columns.clone());
+
+            if let Some(last_ordered_columns) = last_ordered_columns {
+                for cur_group in &mut self.aggr_state.group_states {
+                    if cur_group.ordered_columns != last_ordered_columns {
+                        // We will no longer receive value. Set status to GroupStatus::CanEmit
+                        // meaning we can generate result for this group.
+                        cur_group.emit_status = GroupStatus::CanEmit;
+                        new_result = true;
+                    }
+                }
+            }
+            if new_result {
+                self.exec_state = ExecutionState::ProducingOutput;
+            }
+        }
+
         Ok(allocated)
     }
 }
 
+#[derive(Debug, PartialEq)]
+enum GroupStatus {
+    // `CannotEmit` means data for current group is not complete. New data may arrive.
+    CannotEmit,

Review Comment:
   `GroupProgress`, `Incomplete`, `ReadyToEmit` maybe.



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -822,13 +930,127 @@ mod tests {
     use std::task::{Context, Poll};
 
     use super::StreamType;
+    use crate::physical_plan::aggregates::GroupByOrderMode::{Ordered, PartiallyOrdered};
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::physical_plan::{
         ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
         Statistics,
     };
     use crate::prelude::SessionContext;
 
+    // Generate a schema which consists of 5 columns (a, b, c, d, e)
+    fn create_test_schema() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, true);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, true);
+        let e = Field::new("e", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+
+        Ok(schema)
+    }
+
+    // Created a sorted parquet exec
+    fn csv_exec_sorted(
+        schema: &SchemaRef,
+        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+        infinite_source: bool,
+    ) -> Arc<dyn ExecutionPlan> {
+        let sort_exprs = sort_exprs.into_iter().collect();
+
+        Arc::new(CsvExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+                file_schema: schema.clone(),
+                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: Some(sort_exprs),
+                infinite_source,
+            },
+            false,
+            0,
+            FileCompressionType::UNCOMPRESSED,
+        ))
+    }
+
+    /// make PhysicalSortExpr with default options
+    fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
+        sort_expr_options(name, schema, SortOptions::default())
+    }
+
+    /// PhysicalSortExpr with specified options
+    fn sort_expr_options(
+        name: &str,
+        schema: &Schema,
+        options: SortOptions,
+    ) -> PhysicalSortExpr {
+        PhysicalSortExpr {
+            expr: col(name, schema).unwrap(),
+            options,
+        }
+    }
+
+    #[tokio::test]
+    async fn test_get_working_mode() -> Result<()> {
+        let test_schema = create_test_schema()?;
+        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC NULLS FIRST
+        // Column d, e is not ordered.
+        let sort_exprs = vec![
+            sort_expr("a", &test_schema),
+            sort_expr("b", &test_schema),
+            sort_expr("c", &test_schema),
+        ];
+        let input = csv_exec_sorted(&test_schema, sort_exprs, true);
+
+        // test cases consists of vector of tuples. Where each tuple represents a single test case.
+        // First field in the tuple is Vec<str> where each element in the vector represents GROUP BY columns
+        // For instance `vec!["a", "b"]` corresponds to GROUP BY a, b
+        // Second field in the tuple is Option<GroupByOrderMode>, which corresponds to expected algorithm mode.
+        // None represents that existing ordering is not sufficient to run executor with any one of the algorithms
+        // (We need to add SortExec to be able to run it).
+        // Some(GroupByOrderMode) represents, we can run algorithm with existing ordering; and algorithm should work in
+        // GroupByOrderMode.
+        let test_cases = vec![

Review Comment:
   👍



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -72,6 +74,15 @@ pub enum AggregateMode {
     Single,
 }
 
+/// Group By expression modes
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum GroupByOrderMode {
+    /// Some of the expressions in the GROUP BY clause have an ordering.
+    PartiallyOrdered,

Review Comment:
   `PartiallyOrdered` is left over for follow-up PRs, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1518602558

   This PR looks really neat -- I will find time to review it either this weekend or early next week
   
   I changed its title to "streaming group by" so it might be clearer to others what this PR contains 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1175324013


##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -217,7 +217,7 @@ impl ExecutionPlan for CustomExec {
         datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+    fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {

Review Comment:
   ~Why does this signature need to change? The new signature requires an allocation / `clone` of a Vec where the previous one didn't and thus this seems to change the API for the worse.~
   
   Update: this is explained in the PR description:
   
   > This functionality requires us to calculate output_ordering dynamically. For this reason, this PR accompanies the api change from fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> to fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>>. To support dynamic calculation of the output ordering. See the corresponding https://github.com/synnada-ai/arrow-datafusion/pull/78 for more information.
   
   The linked PR https://github.com/synnada-ai/arrow-datafusion/pull/78#issue-1657633081 says that the issue is related to the AggregateExec changing during optimization, which I don't understand -- if the input changes then a new AggregateExec is also created
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1175324013


##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -217,7 +217,7 @@ impl ExecutionPlan for CustomExec {
         datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+    fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {

Review Comment:
   ~Why does this signature need to change? The new signature requires an allocation / `clone` of a Vec where the previous one didn't and thus this seems to change the API for the worse.~
   
   Update: this is explained in the PR description:
   
   > This functionality requires us to calculate output_ordering dynamically. For this reason, this PR accompanies the api change from fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> to fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>>. To support dynamic calculation of the output ordering. See the corresponding https://github.com/synnada-ai/arrow-datafusion/pull/78 for more information.
   
   Perhaps we can calculate the output ordering once in the constructor rather than on demand.
   
   ```
   
       fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {
           self.calc_aggregation_ordering().map(|state| state.ordering)
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1174964125


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -331,10 +408,32 @@ impl AggregateExec {
                     batch_size,
                     context,
                     partition,
+                    state,
                 )?,
             ))
         }
     }
+
+    fn calc_aggregate_state(&self) -> Option<AggregationOrdering> {

Review Comment:
   Renamed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1520496156

   > For reference, here is the same benchmark run against `main` itself:
   
   This is very helpful. The variance seems larger than one expects.
   
   It seems there may be a tiny slow-down of magnitude noise variance / 2 (in high cardinality cases?). @mustafasrepo and I just had a meeting to go over why it could be. He will respond explaining our theory in greater detail, answer your other questions and maybe even suggest a fix/improvement.
   
   Based on the discussion afterwards and the final numbers, we can reach a consensus on whether we should have two impls with some code duplication, or use the current structure -- we will then take the necessary steps accordingly.
   
   Thanks for all the reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1175568710


##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -217,7 +217,7 @@ impl ExecutionPlan for CustomExec {
         datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+    fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {

Review Comment:
   > ~Why does this signature need to change? The new signature requires an allocation / `clone` of a Vec where the previous one didn't and thus this seems to change the API for the worse.~
   > 
   > Update: this is explained in the PR description:
   > 
   > > This functionality requires us to calculate output_ordering dynamically. For this reason, this PR accompanies the api change from fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> to fn output_ordering(&self) -> Option<Vec>. To support dynamic calculation of the output ordering. See the corresponding [synnada-ai#78](https://github.com/synnada-ai/arrow-datafusion/pull/78) for more information.
   > 
   > The linked PR [synnada-ai#78 (comment)](https://github.com/synnada-ai/arrow-datafusion/pull/78#issue-1657633081) says that the issue is related to the AggregateExec changing during optimization, which I don't understand -- if the input changes then a new AggregateExec is also created
   
   As you say, this approach may be unnecessary. Initially I thought, information during the construction of the `AggregateExec` may change during planning, this would make us produce sub-optimal plans. However, when input changes new `AggregateExec` is also created, and we have all the information necessary. I will discuss it with our team, whether we lose any functionality with old API( probably we are not). After that discussion I will do the necessary changes. Thanks for pointing it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1176177564


##########
datafusion-examples/examples/custom_datasource.rs:
##########
@@ -217,7 +217,7 @@ impl ExecutionPlan for CustomExec {
         datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
     }
 
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+    fn output_ordering(&self) -> Option<Vec<PhysicalSortExpr>> {

Review Comment:
   Our team agreed on, this is unnecessary. I have retracted to old API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#issuecomment-1521587813

   Should we come up with a new physical requirements(Grouping requirements) besides the Sort and Distribution requirements ? 
   The Grouping requirements can be satisfied in a very flexible manner, like `Grouping(a, b, c)` can be satisfied by `Sort(a, b, c) `or` Sort(b, a, c)` or `Distribution(a, b, c`) or `Distribution(b, a, c)` etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo closed pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo closed pull request #6034: Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered
URL: https://github.com/apache/arrow-datafusion/pull/6034


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org