You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/23 16:20:17 UTC

[arrow-datafusion] branch master updated: refactor: isolate common memory accounting utils (#4341)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ad5a50870 refactor: isolate common memory accounting utils (#4341)
ad5a50870 is described below

commit ad5a50870b8af43cb59b491b2161ef7dc1fc45df
Author: Marco Neumann <ma...@crepererum.net>
AuthorDate: Wed Nov 23 16:20:12 2022 +0000

    refactor: isolate common memory accounting utils (#4341)
---
 .../{memory_manager.rs => memory_manager/mod.rs}   |   2 +
 .../core/src/execution/memory_manager/proxy.rs     | 180 +++++++++++++++++++++
 .../core/src/physical_plan/aggregates/row_hash.rs  | 150 ++---------------
 3 files changed, 194 insertions(+), 138 deletions(-)

diff --git a/datafusion/core/src/execution/memory_manager.rs b/datafusion/core/src/execution/memory_manager/mod.rs
similarity index 99%
rename from datafusion/core/src/execution/memory_manager.rs
rename to datafusion/core/src/execution/memory_manager/mod.rs
index e7148b066..67f4965d9 100644
--- a/datafusion/core/src/execution/memory_manager.rs
+++ b/datafusion/core/src/execution/memory_manager/mod.rs
@@ -28,6 +28,8 @@ use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 use std::time::{Duration, Instant};
 
+pub mod proxy;
+
 static CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
 
 #[derive(Debug, Clone)]
diff --git a/datafusion/core/src/execution/memory_manager/proxy.rs b/datafusion/core/src/execution/memory_manager/proxy.rs
new file mode 100644
index 000000000..6ea52e909
--- /dev/null
+++ b/datafusion/core/src/execution/memory_manager/proxy.rs
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilities that help with tracking of memory allocations.
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion_common::DataFusionError;
+use hashbrown::raw::{Bucket, RawTable};
+
+use super::{ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager};
+
+/// Accounting proxy for memory usage.
+///
+/// This is helpful when calculating memory usage on the actual data structure is expensive but it is easy to track
+/// allocations while processing data.
+///
+/// This consumer will NEVER spill.
+pub struct MemoryConsumerProxy {
+    /// Name
+    name: String,
+
+    /// Consumer ID.
+    id: MemoryConsumerId,
+
+    /// Linked memory manager.
+    memory_manager: Arc<MemoryManager>,
+
+    /// Currently used size in bytes.
+    used: usize,
+}
+
+impl MemoryConsumerProxy {
+    /// Create new proxy consumer and register it with the given memory manager.
+    pub fn new(
+        name: impl Into<String>,
+        id: MemoryConsumerId,
+        memory_manager: Arc<MemoryManager>,
+    ) -> Self {
+        memory_manager.register_requester(&id);
+
+        Self {
+            name: name.into(),
+            id,
+            memory_manager,
+            used: 0,
+        }
+    }
+
+    /// Try to allocate given amount of memory.
+    pub async fn alloc(&mut self, bytes: usize) -> Result<(), DataFusionError> {
+        self.try_grow(bytes).await?;
+        self.used = self.used.checked_add(bytes).expect("overflow");
+        Ok(())
+    }
+}
+
+#[async_trait]
+impl MemoryConsumer for MemoryConsumerProxy {
+    fn name(&self) -> String {
+        self.name.clone()
+    }
+
+    fn id(&self) -> &crate::execution::MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        Arc::clone(&self.memory_manager)
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Tracking
+    }
+
+    async fn spill(&self) -> Result<usize, DataFusionError> {
+        Err(DataFusionError::ResourcesExhausted(
+            "Cannot spill AggregationState".to_owned(),
+        ))
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used
+    }
+}
+
+impl Drop for MemoryConsumerProxy {
+    fn drop(&mut self) {
+        self.memory_manager
+            .drop_consumer(self.id(), self.mem_used());
+    }
+}
+
+/// Extension trait for [`Vec`] to account for allocations.
+pub trait VecAllocExt {
+    /// Item type.
+    type T;
+
+    /// [Push](Vec::push) new element to vector and store additional allocated bytes in `accounting` (additive).
+    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
+}
+
+impl<T> VecAllocExt for Vec<T> {
+    type T = T;
+
+    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
+        if self.capacity() == self.len() {
+            // allocate more
+
+            // growth factor: 2, but at least 2 elements
+            let bump_elements = (self.capacity() * 2).max(2);
+            let bump_size = std::mem::size_of::<u32>() * bump_elements;
+            self.reserve(bump_elements);
+            *accounting = (*accounting).checked_add(bump_size).expect("overflow");
+        }
+
+        self.push(x);
+    }
+}
+
+/// Extension trait for [`RawTable`] to account for allocations.
+pub trait RawTableAllocExt {
+    /// Item type.
+    type T;
+
+    /// [Insert](RawTable::insert) new element into table and store additional allocated bytes in `accounting` (additive).
+    fn insert_accounted(
+        &mut self,
+        x: Self::T,
+        hasher: impl Fn(&Self::T) -> u64,
+        accounting: &mut usize,
+    ) -> Bucket<Self::T>;
+}
+
+impl<T> RawTableAllocExt for RawTable<T> {
+    type T = T;
+
+    fn insert_accounted(
+        &mut self,
+        x: Self::T,
+        hasher: impl Fn(&Self::T) -> u64,
+        accounting: &mut usize,
+    ) -> Bucket<Self::T> {
+        let hash = hasher(&x);
+
+        match self.try_insert_no_grow(hash, x) {
+            Ok(bucket) => bucket,
+            Err(x) => {
+                // need to request more memory
+
+                let bump_elements = (self.capacity() * 2).max(16);
+                let bump_size = bump_elements * std::mem::size_of::<T>();
+                *accounting = (*accounting).checked_add(bump_size).expect("overflow");
+
+                self.reserve(bump_elements, hasher);
+
+                // still need to insert the element since first try failed
+                // Note: cannot use `.expect` here because `T` may not implement `Debug`
+                match self.try_insert_no_grow(hash, x) {
+                    Ok(bucket) => bucket,
+                    Err(_) => panic!("just grew the container"),
+                }
+            }
+        }
+    }
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index 740e8990e..f0311f088 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -22,14 +22,15 @@ use std::task::{Context, Poll};
 use std::vec;
 
 use ahash::RandomState;
-use async_trait::async_trait;
 use futures::stream::BoxStream;
 use futures::stream::{Stream, StreamExt};
 
 use crate::error::Result;
 use crate::execution::context::TaskContext;
-use crate::execution::memory_manager::ConsumerType;
-use crate::execution::{MemoryConsumer, MemoryConsumerId, MemoryManager};
+use crate::execution::memory_manager::proxy::{
+    MemoryConsumerProxy, RawTableAllocExt, VecAllocExt,
+};
+use crate::execution::MemoryConsumerId;
 use crate::physical_plan::aggregates::{
     evaluate_group_by, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
     PhysicalGroupBy,
@@ -47,13 +48,13 @@ use arrow::{
     error::{ArrowError, Result as ArrowResult},
 };
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use datafusion_common::{DataFusionError, ScalarValue};
+use datafusion_common::ScalarValue;
 use datafusion_row::accessor::RowAccessor;
 use datafusion_row::layout::RowLayout;
 use datafusion_row::reader::{read_row, RowReader};
 use datafusion_row::writer::{write_row, RowWriter};
 use datafusion_row::{MutableRecordBatch, RowType};
-use hashbrown::raw::{Bucket, RawTable};
+use hashbrown::raw::RawTable;
 
 /// Grouping aggregate with row-format aggregation states inside.
 ///
@@ -142,17 +143,14 @@ impl GroupedHashAggregateStreamV2 {
         let aggr_layout = Arc::new(RowLayout::new(&aggr_schema, RowType::WordAligned));
 
         let aggr_state = AggregationState {
-            memory_consumer: AggregationStateMemoryConsumer {
-                id: MemoryConsumerId::new(partition),
-                memory_manager: Arc::clone(&context.runtime_env().memory_manager),
-                used: 0,
-            },
+            memory_consumer: MemoryConsumerProxy::new(
+                "AggregationState",
+                MemoryConsumerId::new(partition),
+                Arc::clone(&context.runtime_env().memory_manager),
+            ),
             map: RawTable::with_capacity(0),
             group_states: Vec::with_capacity(0),
         };
-        context
-            .runtime_env()
-            .register_requester(aggr_state.memory_consumer.id());
 
         timer.done();
 
@@ -467,7 +465,7 @@ struct RowGroupState {
 
 /// The state of all the groups
 struct AggregationState {
-    memory_consumer: AggregationStateMemoryConsumer,
+    memory_consumer: MemoryConsumerProxy,
 
     /// Logically maps group values to an index in `group_states`
     ///
@@ -493,130 +491,6 @@ impl std::fmt::Debug for AggregationState {
     }
 }
 
-/// Accounting data structure for memory usage.
-struct AggregationStateMemoryConsumer {
-    /// Consumer ID.
-    id: MemoryConsumerId,
-
-    /// Linked memory manager.
-    memory_manager: Arc<MemoryManager>,
-
-    /// Currently used size in bytes.
-    used: usize,
-}
-
-#[async_trait]
-impl MemoryConsumer for AggregationStateMemoryConsumer {
-    fn name(&self) -> String {
-        "AggregationState".to_owned()
-    }
-
-    fn id(&self) -> &crate::execution::MemoryConsumerId {
-        &self.id
-    }
-
-    fn memory_manager(&self) -> Arc<MemoryManager> {
-        Arc::clone(&self.memory_manager)
-    }
-
-    fn type_(&self) -> &ConsumerType {
-        &ConsumerType::Tracking
-    }
-
-    async fn spill(&self) -> Result<usize> {
-        Err(DataFusionError::ResourcesExhausted(
-            "Cannot spill AggregationState".to_owned(),
-        ))
-    }
-
-    fn mem_used(&self) -> usize {
-        self.used
-    }
-}
-
-impl AggregationStateMemoryConsumer {
-    async fn alloc(&mut self, bytes: usize) -> Result<()> {
-        self.try_grow(bytes).await?;
-        self.used = self.used.checked_add(bytes).expect("overflow");
-        Ok(())
-    }
-}
-
-impl Drop for AggregationStateMemoryConsumer {
-    fn drop(&mut self) {
-        self.memory_manager
-            .drop_consumer(self.id(), self.mem_used());
-    }
-}
-
-trait VecAllocExt {
-    type T;
-
-    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
-}
-
-impl<T> VecAllocExt for Vec<T> {
-    type T = T;
-
-    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
-        if self.capacity() == self.len() {
-            // allocate more
-
-            // growth factor: 2, but at least 2 elements
-            let bump_elements = (self.capacity() * 2).max(2);
-            let bump_size = std::mem::size_of::<u32>() * bump_elements;
-            self.reserve(bump_elements);
-            *accounting = (*accounting).checked_add(bump_size).expect("overflow");
-        }
-
-        self.push(x);
-    }
-}
-
-trait RawTableAllocExt {
-    type T;
-
-    fn insert_accounted(
-        &mut self,
-        x: Self::T,
-        hasher: impl Fn(&Self::T) -> u64,
-        accounting: &mut usize,
-    ) -> Bucket<Self::T>;
-}
-
-impl<T> RawTableAllocExt for RawTable<T> {
-    type T = T;
-
-    fn insert_accounted(
-        &mut self,
-        x: Self::T,
-        hasher: impl Fn(&Self::T) -> u64,
-        accounting: &mut usize,
-    ) -> Bucket<Self::T> {
-        let hash = hasher(&x);
-
-        match self.try_insert_no_grow(hash, x) {
-            Ok(bucket) => bucket,
-            Err(x) => {
-                // need to request more memory
-
-                let bump_elements = (self.capacity() * 2).max(16);
-                let bump_size = bump_elements * std::mem::size_of::<T>();
-                *accounting = (*accounting).checked_add(bump_size).expect("overflow");
-
-                self.reserve(bump_elements, hasher);
-
-                // still need to insert the element since first try failed
-                // Note: cannot use `.expect` here because `T` may not implement `Debug`
-                match self.try_insert_no_grow(hash, x) {
-                    Ok(bucket) => bucket,
-                    Err(_) => panic!("just grew the container"),
-                }
-            }
-        }
-    }
-}
-
 /// Create grouping rows
 fn create_group_rows(arrays: Vec<ArrayRef>, schema: &Schema) -> Vec<Vec<u8>> {
     let mut writer = RowWriter::new(schema, RowType::Compact);