You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/23 16:20:17 UTC
[arrow-datafusion] branch master updated: refactor: isolate common memory accounting utils (#4341)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ad5a50870 refactor: isolate common memory accounting utils (#4341)
ad5a50870 is described below
commit ad5a50870b8af43cb59b491b2161ef7dc1fc45df
Author: Marco Neumann <ma...@crepererum.net>
AuthorDate: Wed Nov 23 16:20:12 2022 +0000
refactor: isolate common memory accounting utils (#4341)
---
.../{memory_manager.rs => memory_manager/mod.rs} | 2 +
.../core/src/execution/memory_manager/proxy.rs | 180 +++++++++++++++++++++
.../core/src/physical_plan/aggregates/row_hash.rs | 150 ++---------------
3 files changed, 194 insertions(+), 138 deletions(-)
diff --git a/datafusion/core/src/execution/memory_manager.rs b/datafusion/core/src/execution/memory_manager/mod.rs
similarity index 99%
rename from datafusion/core/src/execution/memory_manager.rs
rename to datafusion/core/src/execution/memory_manager/mod.rs
index e7148b066..67f4965d9 100644
--- a/datafusion/core/src/execution/memory_manager.rs
+++ b/datafusion/core/src/execution/memory_manager/mod.rs
@@ -28,6 +28,8 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
+pub mod proxy;
+
static CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, Clone)]
diff --git a/datafusion/core/src/execution/memory_manager/proxy.rs b/datafusion/core/src/execution/memory_manager/proxy.rs
new file mode 100644
index 000000000..6ea52e909
--- /dev/null
+++ b/datafusion/core/src/execution/memory_manager/proxy.rs
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilities that help with tracking of memory allocations.
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion_common::DataFusionError;
+use hashbrown::raw::{Bucket, RawTable};
+
+use super::{ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager};
+
+/// Accounting proxy for memory usage.
+///
+/// This is helpful when calculating memory usage on the actual data structure is expensive but it is easy to track
+/// allocations while processing data.
+///
+/// This consumer will NEVER spill.
+pub struct MemoryConsumerProxy {
+ /// Name
+ name: String,
+
+ /// Consumer ID.
+ id: MemoryConsumerId,
+
+ /// Linked memory manager.
+ memory_manager: Arc<MemoryManager>,
+
+ /// Currently used size in bytes.
+ used: usize,
+}
+
+impl MemoryConsumerProxy {
+ /// Create new proxy consumer and register it with the given memory manager.
+ pub fn new(
+ name: impl Into<String>,
+ id: MemoryConsumerId,
+ memory_manager: Arc<MemoryManager>,
+ ) -> Self {
+ memory_manager.register_requester(&id);
+
+ Self {
+ name: name.into(),
+ id,
+ memory_manager,
+ used: 0,
+ }
+ }
+
+ /// Try to allocate given amount of memory.
+ pub async fn alloc(&mut self, bytes: usize) -> Result<(), DataFusionError> {
+ self.try_grow(bytes).await?;
+ self.used = self.used.checked_add(bytes).expect("overflow");
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl MemoryConsumer for MemoryConsumerProxy {
+ fn name(&self) -> String {
+ self.name.clone()
+ }
+
+ fn id(&self) -> &crate::execution::MemoryConsumerId {
+ &self.id
+ }
+
+ fn memory_manager(&self) -> Arc<MemoryManager> {
+ Arc::clone(&self.memory_manager)
+ }
+
+ fn type_(&self) -> &ConsumerType {
+ &ConsumerType::Tracking
+ }
+
+ async fn spill(&self) -> Result<usize, DataFusionError> {
+ Err(DataFusionError::ResourcesExhausted(
+ "Cannot spill AggregationState".to_owned(),
+ ))
+ }
+
+ fn mem_used(&self) -> usize {
+ self.used
+ }
+}
+
+impl Drop for MemoryConsumerProxy {
+ fn drop(&mut self) {
+ self.memory_manager
+ .drop_consumer(self.id(), self.mem_used());
+ }
+}
+
+/// Extension trait for [`Vec`] to account for allocations.
+pub trait VecAllocExt {
+ /// Item type.
+ type T;
+
+ /// [Push](Vec::push) new element to vector and store additional allocated bytes in `accounting` (additive).
+ fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
+}
+
+impl<T> VecAllocExt for Vec<T> {
+ type T = T;
+
+ fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
+ if self.capacity() == self.len() {
+ // allocate more
+
+ // growth factor: 2, but at least 2 elements
+ let bump_elements = (self.capacity() * 2).max(2);
+ let bump_size = std::mem::size_of::<u32>() * bump_elements;
+ self.reserve(bump_elements);
+ *accounting = (*accounting).checked_add(bump_size).expect("overflow");
+ }
+
+ self.push(x);
+ }
+}
+
+/// Extension trait for [`RawTable`] to account for allocations.
+pub trait RawTableAllocExt {
+ /// Item type.
+ type T;
+
+ /// [Insert](RawTable::insert) new element into table and store additional allocated bytes in `accounting` (additive).
+ fn insert_accounted(
+ &mut self,
+ x: Self::T,
+ hasher: impl Fn(&Self::T) -> u64,
+ accounting: &mut usize,
+ ) -> Bucket<Self::T>;
+}
+
+impl<T> RawTableAllocExt for RawTable<T> {
+ type T = T;
+
+ fn insert_accounted(
+ &mut self,
+ x: Self::T,
+ hasher: impl Fn(&Self::T) -> u64,
+ accounting: &mut usize,
+ ) -> Bucket<Self::T> {
+ let hash = hasher(&x);
+
+ match self.try_insert_no_grow(hash, x) {
+ Ok(bucket) => bucket,
+ Err(x) => {
+ // need to request more memory
+
+ let bump_elements = (self.capacity() * 2).max(16);
+ let bump_size = bump_elements * std::mem::size_of::<T>();
+ *accounting = (*accounting).checked_add(bump_size).expect("overflow");
+
+ self.reserve(bump_elements, hasher);
+
+ // still need to insert the element since first try failed
+ // Note: cannot use `.expect` here because `T` may not implement `Debug`
+ match self.try_insert_no_grow(hash, x) {
+ Ok(bucket) => bucket,
+ Err(_) => panic!("just grew the container"),
+ }
+ }
+ }
+ }
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index 740e8990e..f0311f088 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -22,14 +22,15 @@ use std::task::{Context, Poll};
use std::vec;
use ahash::RandomState;
-use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::stream::{Stream, StreamExt};
use crate::error::Result;
use crate::execution::context::TaskContext;
-use crate::execution::memory_manager::ConsumerType;
-use crate::execution::{MemoryConsumer, MemoryConsumerId, MemoryManager};
+use crate::execution::memory_manager::proxy::{
+ MemoryConsumerProxy, RawTableAllocExt, VecAllocExt,
+};
+use crate::execution::MemoryConsumerId;
use crate::physical_plan::aggregates::{
evaluate_group_by, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
PhysicalGroupBy,
@@ -47,13 +48,13 @@ use arrow::{
error::{ArrowError, Result as ArrowResult},
};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use datafusion_common::{DataFusionError, ScalarValue};
+use datafusion_common::ScalarValue;
use datafusion_row::accessor::RowAccessor;
use datafusion_row::layout::RowLayout;
use datafusion_row::reader::{read_row, RowReader};
use datafusion_row::writer::{write_row, RowWriter};
use datafusion_row::{MutableRecordBatch, RowType};
-use hashbrown::raw::{Bucket, RawTable};
+use hashbrown::raw::RawTable;
/// Grouping aggregate with row-format aggregation states inside.
///
@@ -142,17 +143,14 @@ impl GroupedHashAggregateStreamV2 {
let aggr_layout = Arc::new(RowLayout::new(&aggr_schema, RowType::WordAligned));
let aggr_state = AggregationState {
- memory_consumer: AggregationStateMemoryConsumer {
- id: MemoryConsumerId::new(partition),
- memory_manager: Arc::clone(&context.runtime_env().memory_manager),
- used: 0,
- },
+ memory_consumer: MemoryConsumerProxy::new(
+ "AggregationState",
+ MemoryConsumerId::new(partition),
+ Arc::clone(&context.runtime_env().memory_manager),
+ ),
map: RawTable::with_capacity(0),
group_states: Vec::with_capacity(0),
};
- context
- .runtime_env()
- .register_requester(aggr_state.memory_consumer.id());
timer.done();
@@ -467,7 +465,7 @@ struct RowGroupState {
/// The state of all the groups
struct AggregationState {
- memory_consumer: AggregationStateMemoryConsumer,
+ memory_consumer: MemoryConsumerProxy,
/// Logically maps group values to an index in `group_states`
///
@@ -493,130 +491,6 @@ impl std::fmt::Debug for AggregationState {
}
}
-/// Accounting data structure for memory usage.
-struct AggregationStateMemoryConsumer {
- /// Consumer ID.
- id: MemoryConsumerId,
-
- /// Linked memory manager.
- memory_manager: Arc<MemoryManager>,
-
- /// Currently used size in bytes.
- used: usize,
-}
-
-#[async_trait]
-impl MemoryConsumer for AggregationStateMemoryConsumer {
- fn name(&self) -> String {
- "AggregationState".to_owned()
- }
-
- fn id(&self) -> &crate::execution::MemoryConsumerId {
- &self.id
- }
-
- fn memory_manager(&self) -> Arc<MemoryManager> {
- Arc::clone(&self.memory_manager)
- }
-
- fn type_(&self) -> &ConsumerType {
- &ConsumerType::Tracking
- }
-
- async fn spill(&self) -> Result<usize> {
- Err(DataFusionError::ResourcesExhausted(
- "Cannot spill AggregationState".to_owned(),
- ))
- }
-
- fn mem_used(&self) -> usize {
- self.used
- }
-}
-
-impl AggregationStateMemoryConsumer {
- async fn alloc(&mut self, bytes: usize) -> Result<()> {
- self.try_grow(bytes).await?;
- self.used = self.used.checked_add(bytes).expect("overflow");
- Ok(())
- }
-}
-
-impl Drop for AggregationStateMemoryConsumer {
- fn drop(&mut self) {
- self.memory_manager
- .drop_consumer(self.id(), self.mem_used());
- }
-}
-
-trait VecAllocExt {
- type T;
-
- fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
-}
-
-impl<T> VecAllocExt for Vec<T> {
- type T = T;
-
- fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
- if self.capacity() == self.len() {
- // allocate more
-
- // growth factor: 2, but at least 2 elements
- let bump_elements = (self.capacity() * 2).max(2);
- let bump_size = std::mem::size_of::<u32>() * bump_elements;
- self.reserve(bump_elements);
- *accounting = (*accounting).checked_add(bump_size).expect("overflow");
- }
-
- self.push(x);
- }
-}
-
-trait RawTableAllocExt {
- type T;
-
- fn insert_accounted(
- &mut self,
- x: Self::T,
- hasher: impl Fn(&Self::T) -> u64,
- accounting: &mut usize,
- ) -> Bucket<Self::T>;
-}
-
-impl<T> RawTableAllocExt for RawTable<T> {
- type T = T;
-
- fn insert_accounted(
- &mut self,
- x: Self::T,
- hasher: impl Fn(&Self::T) -> u64,
- accounting: &mut usize,
- ) -> Bucket<Self::T> {
- let hash = hasher(&x);
-
- match self.try_insert_no_grow(hash, x) {
- Ok(bucket) => bucket,
- Err(x) => {
- // need to request more memory
-
- let bump_elements = (self.capacity() * 2).max(16);
- let bump_size = bump_elements * std::mem::size_of::<T>();
- *accounting = (*accounting).checked_add(bump_size).expect("overflow");
-
- self.reserve(bump_elements, hasher);
-
- // still need to insert the element since first try failed
- // Note: cannot use `.expect` here because `T` may not implement `Debug`
- match self.try_insert_no_grow(hash, x) {
- Ok(bucket) => bucket,
- Err(_) => panic!("just grew the container"),
- }
- }
- }
- }
-}
-
/// Create grouping rows
fn create_group_rows(arrays: Vec<ArrayRef>, schema: &Schema) -> Vec<Vec<u8>> {
let mut writer = RowWriter::new(schema, RowType::Compact);