You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/23 10:59:41 UTC

[GitHub] [arrow-datafusion] crepererum commented on issue #2723: Consolidate GroupByHash implementations `row_hash.rs` and `hash.rs` (remove duplication)

crepererum commented on issue #2723:
URL: https://github.com/apache/arrow-datafusion/issues/2723#issuecomment-1324876060

   # Grant Plan
   
   So @tustvold and I came up with a plan for this. Let me illustrate.
   
   
   ## Prior Art
   Let's first quickly illustrate what the current state is and what the problem is. We have two group-by implementations:
   
   ### V1 (`hash.rs`)
   This is the original, fully-featured implementation. It uses `Box<[ScalarValue]>` as a group key and for each group a `Box<dyn Accumulator>` to perform the aggregation and store the per-group state (so there's one `Accumulator` per group). `hash.rs` manages the entire group->state mapping. The grouping is is calculated once per record batch, then a `take` kernel reshuffles the batch and we pass slices of that to the individual `Accumulator`s.
   
   While this is nice from a flexibility PoV, this is also very slow, because:
   
   - `Box<[ScalarValue]>` is not exactly fast to compute
   - There's one dynamic-dispatched object per group
   
   V1 currently does NOT support dynamic memory tracking, see #3940.
   
   ### V2 (`row_hash.rs`)
   This uses a row format ... or two to be precise. The group key is stored as DataFusion *compact* row format. There's one `Box<dyn RowAccumulator>` per aggregation target (NOT per group) and the per-group state is stored in the DataFusion *word-aligned* row format. `row_hash.rs` manages the mapping from group key to state and passes a `RowAccessor` to the `RowAccumulator`. The processing is similar to V1: groups are calculated once per batch, then a `take` kernel reshuffles everything and slices are passed to the `RowAccumulator`.
   
   V2 is faster than V1, but in our opinion the reasoning is blurry: there are two differences (group key calculation and state storage) and we believe that esp. the first once is responsible for the improvement.
   
   The DF *word-aligned* row format will never work for some aggregations like non-approximative median. So this is somewhat a dead end.
   
   The good thing is that V2 support memory tracking (see #3940).
   
   ### Row Formats
   See #4179 -- Tl;Dr;: we have 3 row formats in our stack and we should focus on one.
   
   ## Proposal
   We think that V2 has some good ideas but will never get finished and could also be improved. As weird as it may sound (and please read the implementation plan before arguing against it) we propose a V3.
   
   ### Design
   Use the arrow row format as group key. I (= @crepererum ) have some early benchmarks that change alone would improve the V2 performance. Ditch the *word-aligned* row format for the state management and change the aggregator to:
   
   ```rust
   trait Aggregator {
       /// Update aggregator state for given groups.
       ///
       /// Return allocation size change (positive for additional allocated bytes, negative for freed bytes).
       fn update_batch(&mut self, keys_and_values: &[(OwnedRow, Vec<ArrayRef>)]) -> Result<isize>;
       ...
   }
   ```
   
   Hence the aggregator will be dyn-dispatched ONCE per record batch and will keep its own internal state. This moves the key->state map from the `[row_]hash.rs` to the aggregators. We will provide tooling (macros or generics) to simplify the implementation and to avoid boilerplate code as much as possible.
   
   
   ### Implementation Plan
   
   1. **arrow row format prep:** Fill the gaps that the arrow row format currently has:
      - **size estimation:** So we can hook it into the DF memory management
      - **types:** Support all the arrow types (except for very few that are so weird that nobody cares)
   2. **row format use:** Use the arrow row format as a group key in V2. This will strictly allow more group-bys to pass through V2 because we now allow more group key types.
   3. **V2 state management:** Change V2 state management to be like the V3 version above. We don't want to migrate all accumulators in one go, so we'll provide an adapter that uses the new `Aggregator` interface described above with the `RowAggregator` internally. This may result in a small performance regression for the current V2, but we think it will be very small (if even noticeable, it may even improve perf due to reduced dyn dispatch). Feature support for V2 will stay the same
   4. **aggregator migration:** Migrate one aggregator at the time to support the new interface. Notice that in contrast to V2 `RowAggregator`, the V3 proposal is flexible enough to support all aggregation (even the ones that require dynamic allocations).
   5. **remove V1:** V2 is now basically V3 but with full feature support. Delete V1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org