You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2024/03/11 20:17:06 UTC

[I] Generate GroupByHash output in multiple `RecordBatch`es rather than one large one [arrow-datafusion]

alamb opened a new issue, #9562:
URL: https://github.com/apache/arrow-datafusion/issues/9562

   ### Is your feature request related to a problem or challenge?
   
   1. The `AggregateExec` generates one single (giant) `RecordBatch` on output ([source](https://github.com/apache/arrow-datafusion/blob/d5b635945307d5c7fe6fa10d3f65ee1ba2d58a5a/datafusion/physical-plan/src/aggregates/row_hash.rs#L660))
   2. Which is then emitted in parts (via `RecordBatch::slice()`, which does not actually allocate any additional memory) ([source](https://github.com/apache/arrow-datafusion/blob/d5b635945307d5c7fe6fa10d3f65ee1ba2d58a5a/datafusion/physical-plan/src/aggregates/row_hash.rs#L492-L497)) 
   
   This has at least two potential downsides:
   1. No memory is freed until the GroupByHash has output every output row
   2. As we see in https://github.com/apache/arrow-datafusion/issues/9417, if there are upstream operators like TopK that hold references to any of these sliced `RecordBatch`s, those slices  are treated as though they were an additional allocation that needs to be tracked ([source](https://github.com/apache/arrow-datafusion/blob/e642cc2a94f38518d765d25c8113523aedc29198/datafusion/physical-plan/src/topk/mod.rs#L576))
   
   Something like this in pictures:
   
   ```
                                                    Output             
                              ▲               RecordBatches are        
                              │                 slices into a          
                     ┌────────────────┐          single large          
                     │  RecordBatch   │─ ─ ─ ┐   output batch          
                     └────────────────┴ ─ ─ ┐                          
                              ▲              │                         
                              │             │        ┌────────────────┐
                     ┌────────────────┐      └ ─ ─ ─▶│                │
     output stream   │  RecordBatch   │     │        ├ ─ ─ ─ ─ ─ ─ ─ ─│
                     └────────────────┘      ─ ─ ─ ─▶│                │
                                                     ├ ─ ─ ─ ─ ─ ─ ─ ─│
                            ...                      │                │
                                                     │                │
                              ▲                      │      ...       │
                              │                      │                │
                     ┌────────────────┐              │                │
                     │  RecordBatch   ├ ─ ─ ┐        │                │
                     └────────────────┘              │                │
                              ▲             │        ├ ─ ─ ─ ─ ─ ─ ─ ─│
                              │              ─ ─ ─ ─▶│                │
                              │                      └────────────────┘
                              │                                        
                  ┏━━━━━━━━━━━━━━━━━━━━━━━┓                            
                  ┃                       ┃       Single RecordBatch   
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃    GroupByHashExec    ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┃                       ┃                            
                  ┗━━━━━━━━━━━━━━━━━━━━━━━┛                            
   ```
   
   ### Describe the solution you'd like
   
   If we had infinite time / engineering hours I think a better approach would actually be to change GroupByHash so it didn't create a single giant contiguous `RecordBatch`
   
   Instead it would be better if GroupByHash produced a `Vec<RecordBatch>` and then incrementally fed those batches out
   
   Doing this would allow the GroupByHash to release memory incrementally as it output. This is analogous to how @korowa  made join output incremental in https://github.com/apache/arrow-datafusion/pull/8658
   
   Perhaps something like
   
   ```
                               ▲                                        
                               │                                        
                      ┌────────────────┐                    Output      
     output stream    │  RecordBatch   │              RecordBatches are 
                      └────────────────┘              created in smaller
                               ▲                      chunks and emitted
                               │                          one by one    
                               │                                        
                               │                                        
                   ┏━━━━━━━━━━━━━━━━━━━━━━━┓          ┌────────────────┐
                   ┃                       ┃          │  RecordBatch   │
                   ┃                       ┃          └────────────────┘
                   ┃                       ┃          ┌────────────────┐
                   ┃                       ┃          │  RecordBatch   │
                   ┃                       ┃          └────────────────┘
                   ┃    GroupByHashExec    ┃          ┌────────────────┐
                   ┃                       ┃          │  RecordBatch   │
                   ┃                       ┃          └────────────────┘
                   ┃                       ┃                 ...        
                   ┃                       ┃          ┌────────────────┐
                   ┃                       ┃          │  RecordBatch   │
                   ┃                       ┃          └────────────────┘
                   ┗━━━━━━━━━━━━━━━━━━━━━━━┛                            
                                                       Vec<RecordBatch> 
                                                                        
                                                                        
   ```
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Generate GroupByHash output in multiple `RecordBatch`es rather than one large one [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #9562:
URL: https://github.com/apache/arrow-datafusion/issues/9562#issuecomment-1992768884

   FYI this issue may be tricky -- as it will be performance critical -- I will be happy to assist


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Generate GroupByHash output in multiple `RecordBatch`es rather than one large one [arrow-datafusion]

Posted by "guojidan (via GitHub)" <gi...@apache.org>.
guojidan commented on issue #9562:
URL: https://github.com/apache/arrow-datafusion/issues/9562#issuecomment-1991291959

   This issue is interesting, let me try implement it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Generate GroupByHash output in multiple `RecordBatch`es rather than one large one [arrow-datafusion]

Posted by "guojidan (via GitHub)" <gi...@apache.org>.
guojidan commented on issue #9562:
URL: https://github.com/apache/arrow-datafusion/issues/9562#issuecomment-2014643071

   this issue is a bit tricky for me 😢 , I can only think of the following approaches:
   change the `GroupedHashAggregateStream::emit()` function return a vector `Result<Vec<RecordBatch>>`, each `RecordBatch` num_rows equal `GroupedHashAggregateStream::batch_size`, and change `ExecutionState::ProducingOutput(RecordBatch)` to `ExecutionState::ProducingOutput(Vec<RecordBatch>)`, then `GroupedHashAggregateStream::poll_next()` function returns one element of a vector at one loop, like `let output = batchs.pop()` [source](https://github.com/apache/arrow-datafusion/blob/main/datafusion/physical-plan/src/aggregates/row_hash.rs#L496)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Generate GroupByHash output in multiple `RecordBatch`es rather than one large one [arrow-datafusion]

Posted by "guojidan (via GitHub)" <gi...@apache.org>.
guojidan commented on issue #9562:
URL: https://github.com/apache/arrow-datafusion/issues/9562#issuecomment-1991285476

   take


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Generate GroupByHash output in multiple `RecordBatch`es rather than one large one [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #9562:
URL: https://github.com/apache/arrow-datafusion/issues/9562#issuecomment-2015218264

   > this issue is a bit tricky for me 😢 , I can only think of the following approaches: change the `GroupedHashAggregateStream::emit()` function return a vector `Result<Vec<RecordBatch>>`, each `RecordBatch` num_rows equal `GroupedHashAggregateStream::batch_size`, and change `ExecutionState::ProducingOutput(RecordBatch)` to `ExecutionState::ProducingOutput(Vec<RecordBatch>)`, then `GroupedHashAggregateStream::poll_next()` function returns one element of a vector at one loop, like `let output = batchs.pop()` [source](https://github.com/apache/arrow-datafusion/blob/main/datafusion/physical-plan/src/aggregates/row_hash.rs#L496)
   
   I think this approach sounds good -- nice proposal
   
   One thing that could help keep the PRs small and manageable would be to switch the APIs as described above but you could avoid having to change all the `GroupAccumulators` in one PR by returning return a `Vec<>` of size 1 for most of them.
   
   Then we can make subsequent PRs to switch over other groups accumulators as needed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org