You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/15 18:56:12 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4202: Return `ResourceExhausted` errors when memory limit is exceed in `GroupedHashAggregateStreamV2` (Row Hash)

alamb commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1023140383


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -418,6 +529,104 @@ impl std::fmt::Debug for AggregationState {
     }
 }
 
+/// Accounting data structure for memory usage.
+struct AggregationStateMemoryConsumer {
+    /// Consumer ID.
+    id: MemoryConsumerId,
+
+    /// Linked memory manager.
+    memory_manager: Arc<MemoryManager>,
+
+    /// Currently used size in bytes.
+    used: usize,
+}
+
+#[async_trait]
+impl MemoryConsumer for AggregationStateMemoryConsumer {
+    fn name(&self) -> String {
+        "AggregationState".to_owned()
+    }
+
+    fn id(&self) -> &crate::execution::MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        Arc::clone(&self.memory_manager)
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Tracking
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        Err(DataFusionError::ResourcesExhausted(
+            "Cannot spill AggregationState".to_owned(),
+        ))
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used
+    }
+}
+
+impl Drop for AggregationStateMemoryConsumer {
+    fn drop(&mut self) {
+        self.memory_manager
+            .drop_consumer(self.id(), self.mem_used());
+    }
+}
+
+/// Memory pool that can be used in a function scope.
+///
+/// This is helpful if there are many small memory allocations (so the overhead if tracking them in [`MemoryManager`] is
+/// high due to lock contention) and pre-calculating the entire allocation for a whole [`RecordBatch`] is complicated or
+/// expensive.
+///
+/// The pool will try to allocate a whole block of memory and gives back overallocated memory on [drop](Self::drop).

Review Comment:
   👌 



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -70,6 +72,16 @@ use hashbrown::raw::RawTable;
 /// [Compact]: datafusion_row::layout::RowType::Compact
 /// [WordAligned]: datafusion_row::layout::RowType::WordAligned
 pub(crate) struct GroupedHashAggregateStreamV2 {

Review Comment:
   This looks very much like other stream adapters we have in DataFusion -- perhaps we can name it something more general like `SendableRecordBatchStreamWrapper` or something and put it in
   
   https://github.com/apache/arrow-datafusion/blob/c9361e0210861962074eb10d7e480949bb862b97/datafusion/core/src/physical_plan/stream.rs#L34
   
   we can always do this as a follow on PR as well



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -270,10 +315,25 @@ fn group_aggregate_batch(
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
                     let group_state = &mut group_states[*group_idx];
+
                     // 1.3
                     if group_state.indices.is_empty() {
                         groups_with_rows.push(*group_idx);
                     };
+
+                    // ensure we have enough indices allocated
+                    if group_state.indices.capacity() == group_state.indices.len() {
+                        // allocate more
+
+                        // growth factor: 2, but at least 2 elements
+                        let bump_elements = (group_state.indices.capacity() * 2).max(2);

Review Comment:
   I wonder if we could somehow encapsulate the memory manager interactions into functions on `GroupAggrState` rather than treating it like a struct. I don't think that is necessary .
   
   However encapsulating might:
   1. Keep this code manageable for future readers
   2. Allow the memory allocation routines to be unit tested (like that when new groups are added that the memory allocation is incremented correctly)
   
   



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -270,10 +315,25 @@ fn group_aggregate_batch(
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
                     let group_state = &mut group_states[*group_idx];
+
                     // 1.3
                     if group_state.indices.is_empty() {
                         groups_with_rows.push(*group_idx);
                     };
+
+                    // ensure we have enough indices allocated
+                    if group_state.indices.capacity() == group_state.indices.len() {
+                        // allocate more
+
+                        // growth factor: 2, but at least 2 elements

Review Comment:
   Growth factors like this are sometimes capped at some large value (like 1G) to avoid the 2x memory overhead associated at large memory levels.
   
   If we use 2x growth with no cap, you can get into situations like the table would fit in 36GB but the code is trying to go from 32GB to 64GB and hits the limit even when the query could complete. This could always be handled in a follow on PR -- users can always disable the memory manager and let the allocations happen and suffer OOMs if they want the current behavior
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org