You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/14 10:36:50 UTC

[GitHub] [arrow-datafusion] crepererum opened a new pull request, #4202: wire memory management into `GroupedHashAggregateStreamV2`

crepererum opened a new pull request, #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202

   # Which issue does this PR close?
   Doesn't close, but works towards #3940 (need to migrate V1 as well
   
   # Rationale for this change
   Ensure that users don't run out of memory while performing group-by operations. This is esp. important for servers or multi-tenant systems.
   
   # What changes are included in this PR?
   - small clean up regarding async usage (first commit)
   - use a a nested construct (`BoxStream`) for `GroupedHashAggregateStreamV2` so we can call into the async memory manager (I thought about NOT doing this but I think it's worth to consider because on the long run a group-by can get another splillable operation to spill)
   
   # Are these changes tested?
   - new test (`test_oom`)
   - perf results (see below)
   
   Perf results:
   
   ```text
   ❯ cargo bench -p datafusion --bench aggregate_query_sql -- --baseline issue3940a-pre
       Finished bench [optimized] target(s) in 0.08s
        Running benches/aggregate_query_sql.rs (target/release/deps/aggregate_query_sql-e9e315ab7a06a262)
   aggregate_query_no_group_by 15 12
                           time:   [654.77 µs 655.49 µs 656.29 µs]
                           change: [-1.6711% -1.2910% -0.8435%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   Found 9 outliers among 100 measurements (9.00%)
     1 (1.00%) low mild
     5 (5.00%) high mild
     3 (3.00%) high severe
   
   aggregate_query_no_group_by_min_max_f64
                           time:   [579.93 µs 580.59 µs 581.27 µs]
                           change: [-3.8985% -3.2219% -2.6198%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 9 outliers among 100 measurements (9.00%)
     1 (1.00%) low severe
     3 (3.00%) low mild
     1 (1.00%) high mild
     4 (4.00%) high severe
   
   aggregate_query_no_group_by_count_distinct_wide
                           time:   [2.4610 ms 2.4801 ms 2.4990 ms]
                           change: [-2.9300% -1.8414% -0.7493%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   
   Benchmarking aggregate_query_no_group_by_count_distinct_narrow: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.4s, enable flat sampling, or reduce sample count to 50.
   aggregate_query_no_group_by_count_distinct_narrow
                           time:   [1.6578 ms 1.6661 ms 1.6743 ms]
                           change: [-4.5391% -3.5033% -2.5050%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 7 outliers among 100 measurements (7.00%)
     1 (1.00%) low severe
     2 (2.00%) low mild
     2 (2.00%) high mild
     2 (2.00%) high severe
   
   aggregate_query_group_by
                           time:   [2.1767 ms 2.2045 ms 2.2486 ms]
                           change: [-4.1048% -2.5858% -0.3237%] (p = 0.00 < 0.05)
                           Change within noise threshold.
   Found 1 outliers among 100 measurements (1.00%)
     1 (1.00%) high severe
   
   Benchmarking aggregate_query_group_by_with_filter: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.5s, enable flat sampling, or reduce sample count to 60.
   aggregate_query_group_by_with_filter
                           time:   [1.0916 ms 1.0927 ms 1.0941 ms]
                           change: [-0.8524% -0.4230% -0.0724%] (p = 0.02 < 0.05)
                           Change within noise threshold.
   Found 9 outliers among 100 measurements (9.00%)
     2 (2.00%) low severe
     1 (1.00%) low mild
     4 (4.00%) high mild
     2 (2.00%) high severe
   
   aggregate_query_group_by_u64 15 12
                           time:   [2.2108 ms 2.2238 ms 2.2368 ms]
                           change: [-4.2142% -3.2743% -2.3523%] (p = 0.00 < 0.05)
                           Performance has improved.
   
   Benchmarking aggregate_query_group_by_with_filter_u64 15 12: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.5s, enable flat sampling, or reduce sample count to 60.
   aggregate_query_group_by_with_filter_u64 15 12
                           time:   [1.0922 ms 1.0931 ms 1.0940 ms]
                           change: [-0.6872% -0.3192% +0.1193%] (p = 0.12 > 0.05)
                           No change in performance detected.
   Found 7 outliers among 100 measurements (7.00%)
     3 (3.00%) low mild
     4 (4.00%) high severe
   
   aggregate_query_group_by_u64_multiple_keys
                           time:   [14.714 ms 15.023 ms 15.344 ms]
                           change: [-5.8337% -2.7471% +0.2798%] (p = 0.09 > 0.05)
                           No change in performance detected.
   
   aggregate_query_approx_percentile_cont_on_u64
                           time:   [3.7776 ms 3.8049 ms 3.8329 ms]
                           change: [-4.4977% -3.4230% -2.3282%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 2 outliers among 100 measurements (2.00%)
     2 (2.00%) high mild
   
   aggregate_query_approx_percentile_cont_on_f32
                           time:   [3.1769 ms 3.1997 ms 3.2230 ms]
                           change: [-4.4664% -3.2597% -2.0955%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 1 outliers among 100 measurements (1.00%)
     1 (1.00%) high mild
   ```
   
   I think the mild improvements are either flux or due to the somewhat
   manual memory allocation pattern.
   
   
   # Are there any user-facing changes?
   The V2 group-by op an now emit a `ResourceExhausted` error if it runs out of memory. Note that the error is kinda nested/wrapped due to #4172.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
crepererum commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024216048


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -418,6 +487,130 @@ impl std::fmt::Debug for AggregationState {
     }
 }
 
+/// Accounting data structure for memory usage.
+struct AggregationStateMemoryConsumer {
+    /// Consumer ID.
+    id: MemoryConsumerId,
+
+    /// Linked memory manager.
+    memory_manager: Arc<MemoryManager>,
+
+    /// Currently used size in bytes.
+    used: usize,
+}
+
+#[async_trait]
+impl MemoryConsumer for AggregationStateMemoryConsumer {
+    fn name(&self) -> String {
+        "AggregationState".to_owned()
+    }
+
+    fn id(&self) -> &crate::execution::MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        Arc::clone(&self.memory_manager)
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Tracking
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        Err(DataFusionError::ResourcesExhausted(
+            "Cannot spill AggregationState".to_owned(),
+        ))
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used
+    }
+}
+
+impl AggregationStateMemoryConsumer {
+    async fn alloc(&mut self, bytes: usize) -> Result<()> {
+        self.try_grow(bytes).await?;
+        self.used = self.used.checked_add(bytes).expect("overflow");
+        Ok(())
+    }
+}
+
+impl Drop for AggregationStateMemoryConsumer {
+    fn drop(&mut self) {
+        self.memory_manager
+            .drop_consumer(self.id(), self.mem_used());
+    }
+}
+
+trait VecAllocExt {
+    type T;
+
+    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
+}
+
+impl<T> VecAllocExt for Vec<T> {
+    type T = T;
+
+    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
+        if self.capacity() == self.len() {
+            // allocate more
+
+            // growth factor: 2, but at least 2 elements
+            let bump_elements = (self.capacity() * 2).max(2);
+            let bump_size = std::mem::size_of::<u32>() * bump_elements;
+            self.reserve(bump_elements);
+            *accounting = (*accounting).checked_add(bump_size).expect("overflow");
+        }
+
+        self.push(x);
+    }
+}
+
+trait RawTableAllocExt {
+    type T;
+
+    fn insert_accounted(
+        &mut self,
+        x: Self::T,
+        hasher: impl Fn(&Self::T) -> u64,

Review Comment:
   Long-term I would wish that Rust stabilizes the [`Allocator` trait](https://doc.rust-lang.org/std/alloc/trait.Allocator.html) so we could plug this into the data structures and measure their usage (no need to guess).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
crepererum commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024094500


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -418,6 +487,130 @@ impl std::fmt::Debug for AggregationState {
     }
 }
 
+/// Accounting data structure for memory usage.
+struct AggregationStateMemoryConsumer {
+    /// Consumer ID.
+    id: MemoryConsumerId,
+
+    /// Linked memory manager.
+    memory_manager: Arc<MemoryManager>,
+
+    /// Currently used size in bytes.
+    used: usize,
+}
+
+#[async_trait]
+impl MemoryConsumer for AggregationStateMemoryConsumer {
+    fn name(&self) -> String {
+        "AggregationState".to_owned()
+    }
+
+    fn id(&self) -> &crate::execution::MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        Arc::clone(&self.memory_manager)
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Tracking
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        Err(DataFusionError::ResourcesExhausted(
+            "Cannot spill AggregationState".to_owned(),
+        ))
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used
+    }
+}
+
+impl AggregationStateMemoryConsumer {
+    async fn alloc(&mut self, bytes: usize) -> Result<()> {
+        self.try_grow(bytes).await?;
+        self.used = self.used.checked_add(bytes).expect("overflow");
+        Ok(())
+    }
+}
+
+impl Drop for AggregationStateMemoryConsumer {
+    fn drop(&mut self) {
+        self.memory_manager
+            .drop_consumer(self.id(), self.mem_used());
+    }
+}
+
+trait VecAllocExt {
+    type T;
+
+    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
+}
+
+impl<T> VecAllocExt for Vec<T> {
+    type T = T;
+
+    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
+        if self.capacity() == self.len() {
+            // allocate more
+
+            // growth factor: 2, but at least 2 elements
+            let bump_elements = (self.capacity() * 2).max(2);
+            let bump_size = std::mem::size_of::<u32>() * bump_elements;
+            self.reserve(bump_elements);
+            *accounting = (*accounting).checked_add(bump_size).expect("overflow");
+        }
+
+        self.push(x);
+    }
+}
+
+trait RawTableAllocExt {
+    type T;
+
+    fn insert_accounted(
+        &mut self,
+        x: Self::T,
+        hasher: impl Fn(&Self::T) -> u64,

Review Comment:
   Sure, memory accounting is ALWAYS coupled to the data structures that are used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] milenkovicm commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
milenkovicm commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024088334


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -418,6 +487,130 @@ impl std::fmt::Debug for AggregationState {
     }
 }
 
+/// Accounting data structure for memory usage.
+struct AggregationStateMemoryConsumer {
+    /// Consumer ID.
+    id: MemoryConsumerId,
+
+    /// Linked memory manager.
+    memory_manager: Arc<MemoryManager>,
+
+    /// Currently used size in bytes.
+    used: usize,
+}
+
+#[async_trait]
+impl MemoryConsumer for AggregationStateMemoryConsumer {
+    fn name(&self) -> String {
+        "AggregationState".to_owned()
+    }
+
+    fn id(&self) -> &crate::execution::MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        Arc::clone(&self.memory_manager)
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Tracking
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        Err(DataFusionError::ResourcesExhausted(
+            "Cannot spill AggregationState".to_owned(),
+        ))
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used
+    }
+}
+
+impl AggregationStateMemoryConsumer {
+    async fn alloc(&mut self, bytes: usize) -> Result<()> {
+        self.try_grow(bytes).await?;
+        self.used = self.used.checked_add(bytes).expect("overflow");
+        Ok(())
+    }
+}
+
+impl Drop for AggregationStateMemoryConsumer {
+    fn drop(&mut self) {
+        self.memory_manager
+            .drop_consumer(self.id(), self.mem_used());
+    }
+}
+
+trait VecAllocExt {
+    type T;
+
+    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
+}
+
+impl<T> VecAllocExt for Vec<T> {
+    type T = T;
+
+    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
+        if self.capacity() == self.len() {
+            // allocate more
+
+            // growth factor: 2, but at least 2 elements
+            let bump_elements = (self.capacity() * 2).max(2);
+            let bump_size = std::mem::size_of::<u32>() * bump_elements;
+            self.reserve(bump_elements);
+            *accounting = (*accounting).checked_add(bump_size).expect("overflow");
+        }
+
+        self.push(x);
+    }
+}
+
+trait RawTableAllocExt {
+    type T;
+
+    fn insert_accounted(
+        &mut self,
+        x: Self::T,
+        hasher: impl Fn(&Self::T) -> u64,

Review Comment:
   this is coupling with current implementation. for example, what if we decide to keep state in b-tree rather than hash map (we need it sorted due to spill) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1023140383


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -418,6 +529,104 @@ impl std::fmt::Debug for AggregationState {
     }
 }
 
+/// Accounting data structure for memory usage.
+struct AggregationStateMemoryConsumer {
+    /// Consumer ID.
+    id: MemoryConsumerId,
+
+    /// Linked memory manager.
+    memory_manager: Arc<MemoryManager>,
+
+    /// Currently used size in bytes.
+    used: usize,
+}
+
+#[async_trait]
+impl MemoryConsumer for AggregationStateMemoryConsumer {
+    fn name(&self) -> String {
+        "AggregationState".to_owned()
+    }
+
+    fn id(&self) -> &crate::execution::MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        Arc::clone(&self.memory_manager)
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Tracking
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        Err(DataFusionError::ResourcesExhausted(
+            "Cannot spill AggregationState".to_owned(),
+        ))
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used
+    }
+}
+
+impl Drop for AggregationStateMemoryConsumer {
+    fn drop(&mut self) {
+        self.memory_manager
+            .drop_consumer(self.id(), self.mem_used());
+    }
+}
+
+/// Memory pool that can be used in a function scope.
+///
+/// This is helpful if there are many small memory allocations (so the overhead if tracking them in [`MemoryManager`] is
+/// high due to lock contention) and pre-calculating the entire allocation for a whole [`RecordBatch`] is complicated or
+/// expensive.
+///
+/// The pool will try to allocate a whole block of memory and gives back overallocated memory on [drop](Self::drop).

Review Comment:
   👌 



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -70,6 +72,16 @@ use hashbrown::raw::RawTable;
 /// [Compact]: datafusion_row::layout::RowType::Compact
 /// [WordAligned]: datafusion_row::layout::RowType::WordAligned
 pub(crate) struct GroupedHashAggregateStreamV2 {

Review Comment:
   This looks very much like other stream adapters we have in DataFusion -- perhaps we can name it something more general like `SendableRecordBatchStreamWrapper` or something and put it in
   
   https://github.com/apache/arrow-datafusion/blob/c9361e0210861962074eb10d7e480949bb862b97/datafusion/core/src/physical_plan/stream.rs#L34
   
   we can always do this as a follow on PR as well



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -270,10 +315,25 @@ fn group_aggregate_batch(
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
                     let group_state = &mut group_states[*group_idx];
+
                     // 1.3
                     if group_state.indices.is_empty() {
                         groups_with_rows.push(*group_idx);
                     };
+
+                    // ensure we have enough indices allocated
+                    if group_state.indices.capacity() == group_state.indices.len() {
+                        // allocate more
+
+                        // growth factor: 2, but at least 2 elements
+                        let bump_elements = (group_state.indices.capacity() * 2).max(2);

Review Comment:
   I wonder if we could somehow encapsulate the memory manager interactions into functions on `GroupAggrState` rather than treating it like a struct. I don't think that is necessary .
   
   However encapsulating might:
   1. Keep this code manageable for future readers
   2. Allow the memory allocation routines to be unit tested (like that when new groups are added that the memory allocation is incremented correctly)
   
   



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -270,10 +315,25 @@ fn group_aggregate_batch(
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
                     let group_state = &mut group_states[*group_idx];
+
                     // 1.3
                     if group_state.indices.is_empty() {
                         groups_with_rows.push(*group_idx);
                     };
+
+                    // ensure we have enough indices allocated
+                    if group_state.indices.capacity() == group_state.indices.len() {
+                        // allocate more
+
+                        // growth factor: 2, but at least 2 elements

Review Comment:
   Growth factors like this are sometimes capped at some large value (like 1G) to avoid the 2x memory overhead associated at large memory levels.
   
   If we use 2x growth with no cap, you can get into situations like the table would fit in 36GB but the code is trying to go from 32GB to 64GB and hits the limit even when the query could complete. This could always be handled in a follow on PR -- users can always disable the memory manager and let the allocations happen and suffer OOMs if they want the current behavior
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
crepererum commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1023844082


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -270,10 +315,25 @@ fn group_aggregate_batch(
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
                     let group_state = &mut group_states[*group_idx];
+
                     // 1.3
                     if group_state.indices.is_empty() {
                         groups_with_rows.push(*group_idx);
                     };
+
+                    // ensure we have enough indices allocated
+                    if group_state.indices.capacity() == group_state.indices.len() {
+                        // allocate more
+
+                        // growth factor: 2, but at least 2 elements
+                        let bump_elements = (group_state.indices.capacity() * 2).max(2);

Review Comment:
   > I wonder if we could somehow encapsulate the memory manager interactions into functions on `GroupAggrState` rather than treating it like a struct.
   
   That only works if all interactions with `GroupState` go throw methods, not only a few of them due to how Rust handles borrowing (= `fn f(&self)` and `fn f(&mut self)` borrow the whole struct, so you cannot mutable borrow any member at the same time).
   
   > What if `group_aggregate_batch` returns how much more memory it allocated, and accounting is done after method call? ... Also, this way end of the batch would be a "safe point" at which we could trigger spill
   
   Fair, let me try that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] milenkovicm commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
milenkovicm commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024181640


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -138,11 +167,78 @@ impl GroupedHashAggregateStreamV2 {
             aggr_layout,
             baseline_metrics,
             aggregate_expressions,
-            aggr_state: Default::default(),
+            aggr_state,
             random_state: Default::default(),
             batch_size,
             row_group_skip_position: 0,
-        })
+        };
+
+        let stream = futures::stream::unfold(inner, |mut this| async move {
+            let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+            loop {
+                let result: ArrowResult<Option<RecordBatch>> =
+                    match this.input.next().await {
+                        Some(Ok(batch)) => {
+                            let timer = elapsed_compute.timer();
+                            let result = group_aggregate_batch(
+                                &this.mode,
+                                &this.random_state,
+                                &this.group_by,
+                                &mut this.accumulators,
+                                &this.group_schema,
+                                this.aggr_layout.clone(),
+                                batch,
+                                &mut this.aggr_state,
+                                &this.aggregate_expressions,
+                            )
+                            .await;
+
+                            timer.done();
+
+                            match result {
+                                Ok(_) => continue,

Review Comment:
   Apologies you're right @crepererum it is per batch.
   
   The reason why I believe moving it out makes sense is separation of concerns, but it's up to you.
   
   for example, at line 363
   
   ```rust
           // allocate memory
           // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with
           // overshooting a bit. Also this means we either store the whole record batch or not.
           memory_consumer.alloc(allocated).await?;
   ```
   can this trigger spill? will the state be consistent if spill is triggered. My guess it will be not, it might be implementation specific, but hard to tell without understanding memory management implementation, and store implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] milenkovicm commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
milenkovicm commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024085041


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -138,11 +167,78 @@ impl GroupedHashAggregateStreamV2 {
             aggr_layout,
             baseline_metrics,
             aggregate_expressions,
-            aggr_state: Default::default(),
+            aggr_state,
             random_state: Default::default(),
             batch_size,
             row_group_skip_position: 0,
-        })
+        };
+
+        let stream = futures::stream::unfold(inner, |mut this| async move {
+            let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+            loop {
+                let result: ArrowResult<Option<RecordBatch>> =
+                    match this.input.next().await {
+                        Some(Ok(batch)) => {
+                            let timer = elapsed_compute.timer();
+                            let result = group_aggregate_batch(
+                                &this.mode,
+                                &this.random_state,
+                                &this.group_by,
+                                &mut this.accumulators,
+                                &this.group_schema,
+                                this.aggr_layout.clone(),
+                                batch,
+                                &mut this.aggr_state,
+                                &this.aggregate_expressions,
+                            )
+                            .await;
+
+                            timer.done();
+
+                            match result {
+                                Ok(_) => continue,

Review Comment:
   IMHO, this would be place to do something like: 
   
   ```rust
    Ok(_) => {
       let new_data_size = this.aggr_state.get_current_size();
       let acquired = this.memory_manager.can_grow_directly(new_data_size - data_size_before_batch, data_size_before_batch);
       if !acquired {
           this.aggr_state.spill();
           this.memory_manager.record_free_then_acquire(data_size, 0);
       }
       continue;
   }
   ```
   we basically assume  that `group_aggregate_batch` can get all the memory it needs, no need to do per row interaction with memory manager.
   
   this would decouple process and accounting 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] milenkovicm commented on pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
milenkovicm commented on PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#issuecomment-1317709499

   > I think this is looking great
   > 
   > 
   > 
   > https://github.com/apache/arrow-datafusion/pull/4202/files?w=1 shows the diff clearly
   > 
   > 
   > 
   > What are your thoughts @milenkovicm ?
   
   I think @crepererum did fine job here.
   
   Not sure if he will move 
   ```
   memory_consumer.alloc(allocated).await?;
   ```
   Just before return statement, otherwise it is spot on.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] milenkovicm commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
milenkovicm commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024188618


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -418,6 +487,130 @@ impl std::fmt::Debug for AggregationState {
     }
 }
 
+/// Accounting data structure for memory usage.
+struct AggregationStateMemoryConsumer {
+    /// Consumer ID.
+    id: MemoryConsumerId,
+
+    /// Linked memory manager.
+    memory_manager: Arc<MemoryManager>,
+
+    /// Currently used size in bytes.
+    used: usize,
+}
+
+#[async_trait]
+impl MemoryConsumer for AggregationStateMemoryConsumer {
+    fn name(&self) -> String {
+        "AggregationState".to_owned()
+    }
+
+    fn id(&self) -> &crate::execution::MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        Arc::clone(&self.memory_manager)
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Tracking
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        Err(DataFusionError::ResourcesExhausted(
+            "Cannot spill AggregationState".to_owned(),
+        ))
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used
+    }
+}
+
+impl AggregationStateMemoryConsumer {
+    async fn alloc(&mut self, bytes: usize) -> Result<()> {
+        self.try_grow(bytes).await?;
+        self.used = self.used.checked_add(bytes).expect("overflow");
+        Ok(())
+    }
+}
+
+impl Drop for AggregationStateMemoryConsumer {
+    fn drop(&mut self) {
+        self.memory_manager
+            .drop_consumer(self.id(), self.mem_used());
+    }
+}
+
+trait VecAllocExt {
+    type T;
+
+    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
+}
+
+impl<T> VecAllocExt for Vec<T> {
+    type T = T;
+
+    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
+        if self.capacity() == self.len() {
+            // allocate more
+
+            // growth factor: 2, but at least 2 elements
+            let bump_elements = (self.capacity() * 2).max(2);
+            let bump_size = std::mem::size_of::<u32>() * bump_elements;
+            self.reserve(bump_elements);
+            *accounting = (*accounting).checked_add(bump_size).expect("overflow");
+        }
+
+        self.push(x);
+    }
+}
+
+trait RawTableAllocExt {
+    type T;
+
+    fn insert_accounted(
+        &mut self,
+        x: Self::T,
+        hasher: impl Fn(&Self::T) -> u64,

Review Comment:
   my bad @crepererum ignore my comment, apologies 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] milenkovicm commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
milenkovicm commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1023810458


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -270,10 +315,25 @@ fn group_aggregate_batch(
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
                     let group_state = &mut group_states[*group_idx];
+
                     // 1.3
                     if group_state.indices.is_empty() {
                         groups_with_rows.push(*group_idx);
                     };
+
+                    // ensure we have enough indices allocated
+                    if group_state.indices.capacity() == group_state.indices.len() {
+                        // allocate more
+
+                        // growth factor: 2, but at least 2 elements
+                        let bump_elements = (group_state.indices.capacity() * 2).max(2);

Review Comment:
   I tend to agree with with @alamb here, IMHO `group_aggregate_batch` is too busy at the moment, and some kind of separation of concerns would help. 
   
   What if `group_aggregate_batch` returns how much more memory it allocated, and  accounting is done after method call? This would help with encapsulation of aggregation algorithm and make it easier to swap. I'm aware that it might not produce 100% correct results but as we discussed in #3941 it is ok to have small discrepancy for short period of time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
crepererum commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1023869485


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -270,10 +315,25 @@ fn group_aggregate_batch(
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
                     let group_state = &mut group_states[*group_idx];
+
                     // 1.3
                     if group_state.indices.is_empty() {
                         groups_with_rows.push(*group_idx);
                     };
+
+                    // ensure we have enough indices allocated
+                    if group_state.indices.capacity() == group_state.indices.len() {
+                        // allocate more
+
+                        // growth factor: 2, but at least 2 elements
+                        let bump_elements = (group_state.indices.capacity() * 2).max(2);

Review Comment:
   done.
   
   Let me know if this looks better. I will pull out + document all the helper structs and traits when I port V1 (I want at least a 2nd consumer so I can make sure the interface makes sense).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
crepererum commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1023839803


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -270,10 +315,25 @@ fn group_aggregate_batch(
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
                     let group_state = &mut group_states[*group_idx];
+
                     // 1.3
                     if group_state.indices.is_empty() {
                         groups_with_rows.push(*group_idx);
                     };
+
+                    // ensure we have enough indices allocated
+                    if group_state.indices.capacity() == group_state.indices.len() {
+                        // allocate more
+
+                        // growth factor: 2, but at least 2 elements

Review Comment:
   IIRC `Vec` doesn't cap, so at least this ain't a regression:
   
   https://github.com/rust-lang/rust/blob/e702534763599db252f2ca308739ec340d0933de/library/alloc/src/raw_vec.rs#L372



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
crepererum commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024099248


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -138,11 +167,78 @@ impl GroupedHashAggregateStreamV2 {
             aggr_layout,
             baseline_metrics,
             aggregate_expressions,
-            aggr_state: Default::default(),
+            aggr_state,
             random_state: Default::default(),
             batch_size,
             row_group_skip_position: 0,
-        })
+        };
+
+        let stream = futures::stream::unfold(inner, |mut this| async move {
+            let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+            loop {
+                let result: ArrowResult<Option<RecordBatch>> =
+                    match this.input.next().await {
+                        Some(Ok(batch)) => {
+                            let timer = elapsed_compute.timer();
+                            let result = group_aggregate_batch(
+                                &this.mode,
+                                &this.random_state,
+                                &this.group_by,
+                                &mut this.accumulators,
+                                &this.group_schema,
+                                this.aggr_layout.clone(),
+                                batch,
+                                &mut this.aggr_state,
+                                &this.aggregate_expressions,
+                            )
+                            .await;
+
+                            timer.done();
+
+                            match result {
+                                Ok(_) => continue,

Review Comment:
   The interaction is not per row. It's per batch. I can place the accounting here. The code you propose is basically the same that currently runs, just inlined (it's the default impl. of `MemoryConsumer::try_grow`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
alamb merged PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#issuecomment-1320477104

   Benchmark runs are scheduled for baseline = 09e1c9148514a87e3083d3644370d36f2e9fb87d and contender = f3a65c74442fa42770418684a71a09ad9bcc348c. f3a65c74442fa42770418684a71a09ad9bcc348c is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/215fbf69605f466a807ce990cf614e26...adb03fa12479433caac07b1cbaa734e2/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/538cfe30045143428a3eedab7411d50c...ea186a6a7c3e4e11a6775856f0edce61/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/4c0ab6d6135f49758eaae8e4c98ec959...88b4915d1bfa4fcd8a766b5efbc1e2d4/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/4a25a7062a93494f8f57f6af45dc8193...47f465170a2e4768a4e05727c80d1044/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] milenkovicm commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
milenkovicm commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1023824326


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -270,10 +315,25 @@ fn group_aggregate_batch(
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
                     let group_state = &mut group_states[*group_idx];
+
                     // 1.3
                     if group_state.indices.is_empty() {
                         groups_with_rows.push(*group_idx);
                     };
+
+                    // ensure we have enough indices allocated
+                    if group_state.indices.capacity() == group_state.indices.len() {
+                        // allocate more
+
+                        // growth factor: 2, but at least 2 elements
+                        let bump_elements = (group_state.indices.capacity() * 2).max(2);

Review Comment:
   Also, this way end of the batch would be a "safe point" at which we could trigger spill 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
crepererum commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1023835822


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -70,6 +72,16 @@ use hashbrown::raw::RawTable;
 /// [Compact]: datafusion_row::layout::RowType::Compact
 /// [WordAligned]: datafusion_row::layout::RowType::WordAligned
 pub(crate) struct GroupedHashAggregateStreamV2 {

Review Comment:
   Will do that in a follow-up, since migrating V1 will probably end up with the same helper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
crepererum commented on PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#issuecomment-1318382277

   > I'll move the alloc statement, give me a few minutes...
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] milenkovicm commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
milenkovicm commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024184397


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -285,21 +334,40 @@ fn group_aggregate_batch(
                         indices: vec![row as u32], // 1.3
                     };
                     let group_idx = group_states.len();
-                    group_states.push(group_state);
-                    groups_with_rows.push(group_idx);
+
+                    // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
+                    // `group_states` (see allocation down below)
+                    allocated += (std::mem::size_of::<u8>()
+                        * group_state.group_by_values.capacity())
+                        + (std::mem::size_of::<u8>()
+                            * group_state.aggregation_buffer.capacity())
+                        + (std::mem::size_of::<u32>() * group_state.indices.capacity());
 
                     // for hasher function, use precomputed hash value
-                    map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash);
+                    map.insert_accounted(
+                        (hash, group_idx),
+                        |(hash, _group_index)| *hash,
+                        &mut allocated,
+                    );
+
+                    group_states.push_accounted(group_state, &mut allocated);
+
+                    groups_with_rows.push(group_idx);
                 }
             };
         }
 
+        // allocate memory
+        // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with
+        // overshooting a bit. Also this means we either store the whole record batch or not.
+        memory_consumer.alloc(allocated).await?;

Review Comment:
   as i mentioned above, should this call go before return statement? if it triggers spill we internal state should be consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024523906


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -418,6 +487,130 @@ impl std::fmt::Debug for AggregationState {
     }
 }
 
+/// Accounting data structure for memory usage.
+struct AggregationStateMemoryConsumer {

Review Comment:
   ❤️ 



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -418,6 +487,130 @@ impl std::fmt::Debug for AggregationState {
     }
 }
 
+/// Accounting data structure for memory usage.
+struct AggregationStateMemoryConsumer {
+    /// Consumer ID.
+    id: MemoryConsumerId,
+
+    /// Linked memory manager.
+    memory_manager: Arc<MemoryManager>,
+
+    /// Currently used size in bytes.
+    used: usize,
+}
+
+#[async_trait]
+impl MemoryConsumer for AggregationStateMemoryConsumer {
+    fn name(&self) -> String {
+        "AggregationState".to_owned()
+    }
+
+    fn id(&self) -> &crate::execution::MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        Arc::clone(&self.memory_manager)
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Tracking
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        Err(DataFusionError::ResourcesExhausted(
+            "Cannot spill AggregationState".to_owned(),
+        ))
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used
+    }
+}
+
+impl AggregationStateMemoryConsumer {
+    async fn alloc(&mut self, bytes: usize) -> Result<()> {
+        self.try_grow(bytes).await?;
+        self.used = self.used.checked_add(bytes).expect("overflow");
+        Ok(())
+    }
+}
+
+impl Drop for AggregationStateMemoryConsumer {
+    fn drop(&mut self) {
+        self.memory_manager
+            .drop_consumer(self.id(), self.mem_used());
+    }
+}
+
+trait VecAllocExt {
+    type T;
+
+    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
+}
+
+impl<T> VecAllocExt for Vec<T> {

Review Comment:
   this is very nice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

Posted by GitBox <gi...@apache.org>.
crepererum commented on PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#issuecomment-1318343218

   I'll move the alloc statement, give me a few minutes...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org