You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/25 10:38:26 UTC
[arrow-datafusion] branch master updated: add `{TDigest,ScalarValue,Accumulator}::size` (#4342)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e0150439e add `{TDigest,ScalarValue,Accumulator}::size` (#4342)
e0150439e is described below
commit e0150439e89e1d98ff362bddcafdb980ee499a7a
Author: Marco Neumann <ma...@crepererum.net>
AuthorDate: Fri Nov 25 10:38:21 2022 +0000
add `{TDigest,ScalarValue,Accumulator}::size` (#4342)
* feat: add `TDigest::size`
* feat: add `ScalarValue::size`
* feat: add `Accumulator::size`
* docs: improve
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* refactor: simplify memory accounting
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion-examples/examples/simple_udaf.rs | 4 +
datafusion/common/src/scalar.rs | 89 ++++++++++++++++++++++
datafusion/core/tests/user_defined_aggregates.rs | 4 +
datafusion/expr/src/accumulator.rs | 5 ++
.../physical-expr/src/aggregate/approx_distinct.rs | 5 ++
.../src/aggregate/approx_percentile_cont.rs | 6 ++
.../approx_percentile_cont_with_weight.rs | 6 ++
.../physical-expr/src/aggregate/array_agg.rs | 6 ++
.../src/aggregate/array_agg_distinct.rs | 6 ++
datafusion/physical-expr/src/aggregate/average.rs | 4 +
.../physical-expr/src/aggregate/correlation.rs | 9 +++
datafusion/physical-expr/src/aggregate/count.rs | 4 +
.../physical-expr/src/aggregate/count_distinct.rs | 14 ++++
.../physical-expr/src/aggregate/covariance.rs | 4 +
datafusion/physical-expr/src/aggregate/median.rs | 14 ++++
datafusion/physical-expr/src/aggregate/min_max.rs | 8 ++
datafusion/physical-expr/src/aggregate/stddev.rs | 5 ++
datafusion/physical-expr/src/aggregate/sum.rs | 4 +
.../physical-expr/src/aggregate/sum_distinct.rs | 6 ++
datafusion/physical-expr/src/aggregate/tdigest.rs | 14 ++++
datafusion/physical-expr/src/aggregate/variance.rs | 4 +
datafusion/proto/src/lib.rs | 4 +
22 files changed, 225 insertions(+)
diff --git a/datafusion-examples/examples/simple_udaf.rs b/datafusion-examples/examples/simple_udaf.rs
index 416d514e3..bb3a42b8b 100644
--- a/datafusion-examples/examples/simple_udaf.rs
+++ b/datafusion-examples/examples/simple_udaf.rs
@@ -153,6 +153,10 @@ impl Accumulator for GeometricMean {
self.merge(&v)
})
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ }
}
#[tokio::main]
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 96d1ab672..9a1119469 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -19,6 +19,7 @@
use std::borrow::Borrow;
use std::cmp::{max, Ordering};
+use std::collections::HashSet;
use std::convert::{Infallible, TryInto};
use std::ops::{Add, Sub};
use std::str::FromStr;
@@ -2290,6 +2291,94 @@ impl ScalarValue {
ScalarValue::Null => array.data().is_null(index),
}
}
+
+ /// Estimate size if bytes including `Self`. For values with internal containers such as `String`
+ /// includes the allocated size (`capacity`) rather than the current length (`len`)
+ pub fn size(&self) -> usize {
+ std::mem::size_of_val(&self)
+ + match self {
+ ScalarValue::Null
+ | ScalarValue::Boolean(_)
+ | ScalarValue::Float32(_)
+ | ScalarValue::Float64(_)
+ | ScalarValue::Decimal128(_, _, _)
+ | ScalarValue::Int8(_)
+ | ScalarValue::Int16(_)
+ | ScalarValue::Int32(_)
+ | ScalarValue::Int64(_)
+ | ScalarValue::UInt8(_)
+ | ScalarValue::UInt16(_)
+ | ScalarValue::UInt32(_)
+ | ScalarValue::UInt64(_)
+ | ScalarValue::Date32(_)
+ | ScalarValue::Date64(_)
+ | ScalarValue::Time32Second(_)
+ | ScalarValue::Time32Millisecond(_)
+ | ScalarValue::Time64Microsecond(_)
+ | ScalarValue::Time64Nanosecond(_)
+ | ScalarValue::IntervalYearMonth(_)
+ | ScalarValue::IntervalDayTime(_)
+ | ScalarValue::IntervalMonthDayNano(_) => 0,
+ ScalarValue::Utf8(s)
+ | ScalarValue::LargeUtf8(s)
+ | ScalarValue::TimestampSecond(_, s)
+ | ScalarValue::TimestampMillisecond(_, s)
+ | ScalarValue::TimestampMicrosecond(_, s)
+ | ScalarValue::TimestampNanosecond(_, s) => {
+ s.as_ref().map(|s| s.capacity()).unwrap_or_default()
+ }
+ ScalarValue::Binary(b)
+ | ScalarValue::FixedSizeBinary(_, b)
+ | ScalarValue::LargeBinary(b) => {
+ b.as_ref().map(|b| b.capacity()).unwrap_or_default()
+ }
+ // TODO(crepererum): `Field` is NOT fixed size, add `Field::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+ ScalarValue::List(vals, field) => {
+ vals.as_ref()
+ .map(|vals| Self::size_of_vec(vals) - std::mem::size_of_val(vals))
+ .unwrap_or_default()
+ + std::mem::size_of_val(field)
+ }
+ // TODO(crepererum): `Field` is NOT fixed size, add `Field::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+ ScalarValue::Struct(vals, fields) => {
+ vals.as_ref()
+ .map(|vals| {
+ vals.iter()
+ .map(|sv| sv.size() - std::mem::size_of_val(sv))
+ .sum::<usize>()
+ + (std::mem::size_of::<ScalarValue>() * vals.capacity())
+ })
+ .unwrap_or_default()
+ + (std::mem::size_of::<Field>() * fields.capacity())
+ }
+ // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+ ScalarValue::Dictionary(dt, sv) => {
+ std::mem::size_of_val(dt.as_ref()) + sv.size()
+ }
+ }
+ }
+
+ /// Estimates [size](Self::size) of [`Vec`] in bytes.
+ ///
+ /// Includes the size of the [`Vec`] container itself.
+ pub fn size_of_vec(vec: &Vec<Self>) -> usize {
+ (std::mem::size_of::<ScalarValue>() * vec.capacity())
+ + vec
+ .iter()
+ .map(|sv| sv.size() - std::mem::size_of_val(sv))
+ .sum::<usize>()
+ }
+
+ /// Estimates [size](Self::size) of [`HashSet`] in bytes.
+ ///
+ /// Includes the size of the [`HashSet`] container itself.
+ pub fn size_of_hashset<S>(set: &HashSet<Self, S>) -> usize {
+ (std::mem::size_of::<ScalarValue>() * set.capacity())
+ + set
+ .iter()
+ .map(|sv| sv.size() - std::mem::size_of_val(sv))
+ .sum::<usize>()
+ }
}
macro_rules! impl_scalar {
diff --git a/datafusion/core/tests/user_defined_aggregates.rs b/datafusion/core/tests/user_defined_aggregates.rs
index ea6838e70..2903d4272 100644
--- a/datafusion/core/tests/user_defined_aggregates.rs
+++ b/datafusion/core/tests/user_defined_aggregates.rs
@@ -244,4 +244,8 @@ impl Accumulator for FirstSelector {
// same logic is needed as in update_batch
self.update_batch(states)
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ }
}
diff --git a/datafusion/expr/src/accumulator.rs b/datafusion/expr/src/accumulator.rs
index d0540f501..3d107c0c4 100644
--- a/datafusion/expr/src/accumulator.rs
+++ b/datafusion/expr/src/accumulator.rs
@@ -54,6 +54,11 @@ pub trait Accumulator: Send + Sync + Debug {
/// returns its value based on its current state.
fn evaluate(&self) -> Result<ScalarValue>;
+
+ /// Allocated size required for this accumulator, in bytes, including `Self`.
+ /// Allocated means that for internal containers such as `Vec`, the `capacity` should be used
+ /// not the `len`
+ fn size(&self) -> usize;
}
/// Representation of internal accumulator state. Accumulators can potentially have a mix of
diff --git a/datafusion/physical-expr/src/aggregate/approx_distinct.rs b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
index 7302008ba..a2ac7f093 100644
--- a/datafusion/physical-expr/src/aggregate/approx_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
@@ -239,6 +239,11 @@ macro_rules! default_accumulator_impl {
fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::UInt64(Some(self.hll.count() as u64)))
}
+
+ fn size(&self) -> usize {
+ // HLL has static size
+ std::mem::size_of_val(self)
+ }
};
}
diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
index 750675719..2b1a5da85 100644
--- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
@@ -355,6 +355,7 @@ impl ApproxPercentileAccumulator {
}
}
}
+
impl Accumulator for ApproxPercentileAccumulator {
fn state(&self) -> Result<Vec<AggregateState>> {
Ok(self
@@ -413,4 +414,9 @@ impl Accumulator for ApproxPercentileAccumulator {
Ok(())
}
+
+ fn size(&self) -> usize {
+ // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+ std::mem::size_of_val(self) + self.digest.size()
+ }
}
diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
index 85426015e..41f195f38 100644
--- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
@@ -150,4 +150,10 @@ impl Accumulator for ApproxPercentileWithWeightAccumulator {
Ok(())
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ - std::mem::size_of_val(&self.approx_percentile_cont_accumulator)
+ + self.approx_percentile_cont_accumulator.size()
+ }
}
diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs
index 160e4477b..1eff48ce8 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg.rs
@@ -153,6 +153,12 @@ impl Accumulator for ArrayAggAccumulator {
self.datatype.clone(),
))
}
+
+ fn size(&self) -> usize {
+ // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+ std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.values)
+ - std::mem::size_of_val(&self.values)
+ }
}
#[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
index a0ef021b8..bb32a9ffd 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
@@ -156,6 +156,12 @@ impl Accumulator for DistinctArrayAggAccumulator {
self.datatype.clone(),
))
}
+
+ fn size(&self) -> usize {
+ // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+ std::mem::size_of_val(self) + ScalarValue::size_of_hashset(&self.values)
+ - std::mem::size_of_val(&self.values)
+ }
}
#[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index f034e3d56..da70252d4 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -200,6 +200,10 @@ impl Accumulator for AvgAccumulator {
)),
}
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
+ }
}
#[derive(Debug)]
diff --git a/datafusion/physical-expr/src/aggregate/correlation.rs b/datafusion/physical-expr/src/aggregate/correlation.rs
index 8645bd549..a013bd43a 100644
--- a/datafusion/physical-expr/src/aggregate/correlation.rs
+++ b/datafusion/physical-expr/src/aggregate/correlation.rs
@@ -193,6 +193,15 @@ impl Accumulator for CorrelationAccumulator {
Ok(ScalarValue::Float64(None))
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self) - std::mem::size_of_val(&self.covar)
+ + self.covar.size()
+ - std::mem::size_of_val(&self.stddev1)
+ + self.stddev1.size()
+ - std::mem::size_of_val(&self.stddev2)
+ + self.stddev2.size()
+ }
}
#[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
index b64328aa3..4721bf8f2 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -149,6 +149,10 @@ impl Accumulator for CountAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.count)))
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ }
}
#[derive(Debug)]
diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs
index d4c0b4406..ead64ebd8 100644
--- a/datafusion/physical-expr/src/aggregate/count_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs
@@ -218,6 +218,20 @@ impl Accumulator for DistinctCountAccumulator {
))),
}
}
+
+ fn size(&self) -> usize {
+ // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+ std::mem::size_of_val(self)
+ + (std::mem::size_of::<DistinctScalarValues>() * self.values.capacity())
+ + self
+ .values
+ .iter()
+ .map(|vals| {
+ ScalarValue::size_of_vec(&vals.0) - std::mem::size_of_val(&vals.0)
+ })
+ .sum::<usize>()
+ + (std::mem::size_of::<DataType>() * self.state_data_types.capacity())
+ }
}
#[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs b/datafusion/physical-expr/src/aggregate/covariance.rs
index 7797e008d..b5343059e 100644
--- a/datafusion/physical-expr/src/aggregate/covariance.rs
+++ b/datafusion/physical-expr/src/aggregate/covariance.rs
@@ -373,6 +373,10 @@ impl Accumulator for CovarianceAccumulator {
Ok(ScalarValue::Float64(Some(self.algo_const / count as f64)))
}
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ }
}
#[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs
index 968cd6986..deef5dec2 100644
--- a/datafusion/physical-expr/src/aggregate/median.rs
+++ b/datafusion/physical-expr/src/aggregate/median.rs
@@ -183,6 +183,20 @@ impl Accumulator for MedianAccumulator {
)),
}
}
+
+ fn size(&self) -> usize {
+ // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+ std::mem::align_of_val(self)
+ + (std::mem::size_of::<ArrayRef>() * self.all_values.capacity())
+ + self
+ .all_values
+ .iter()
+ .map(|array_ref| {
+ std::mem::size_of_val(array_ref.as_ref())
+ + array_ref.get_array_memory_size()
+ })
+ .sum::<usize>()
+ }
}
/// Create an empty array
diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs
index 2d6961bfc..73f898c42 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -571,6 +571,10 @@ impl Accumulator for MaxAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
Ok(self.max.clone())
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size()
+ }
}
#[derive(Debug)]
@@ -735,6 +739,10 @@ impl Accumulator for MinAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
Ok(self.min.clone())
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size()
+ }
}
#[derive(Debug)]
diff --git a/datafusion/physical-expr/src/aggregate/stddev.rs b/datafusion/physical-expr/src/aggregate/stddev.rs
index 639971811..94ec2418b 100644
--- a/datafusion/physical-expr/src/aggregate/stddev.rs
+++ b/datafusion/physical-expr/src/aggregate/stddev.rs
@@ -211,6 +211,11 @@ impl Accumulator for StddevAccumulator {
)),
}
}
+
+ fn size(&self) -> usize {
+ std::mem::align_of_val(self) - std::mem::align_of_val(&self.variance)
+ + self.variance.size()
+ }
}
#[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs
index 112856988..32259bc30 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -279,6 +279,10 @@ impl Accumulator for SumAccumulator {
Ok(self.sum.clone())
}
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
+ }
}
#[derive(Debug)]
diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
index 73c4828e8..95823c517 100644
--- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
@@ -175,6 +175,12 @@ impl Accumulator for DistinctSumAccumulator {
}
Ok(sum_value)
}
+
+ fn size(&self) -> usize {
+ // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+ std::mem::size_of_val(self) + ScalarValue::size_of_hashset(&self.hash_values)
+ - std::mem::size_of_val(&self.hash_values)
+ }
}
#[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/tdigest.rs b/datafusion/physical-expr/src/aggregate/tdigest.rs
index 977678e5b..c25566077 100644
--- a/datafusion/physical-expr/src/aggregate/tdigest.rs
+++ b/datafusion/physical-expr/src/aggregate/tdigest.rs
@@ -187,6 +187,12 @@ impl TDigest {
pub(crate) fn max_size(&self) -> usize {
self.max_size
}
+
+ /// Size in bytes including `Self`.
+ pub(crate) fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ + (std::mem::size_of::<Centroid>() * self.centroids.capacity())
+ }
}
impl Default for TDigest {
@@ -741,4 +747,12 @@ mod tests {
assert_error_bounds!(t, quantile = 0.5, want = 500.0);
assert_state_roundtrip!(t);
}
+
+ #[test]
+ fn test_size() {
+ let t = TDigest::new(10);
+ let t = t.merge_unsorted_f64(vec![0.0, 1.0]);
+
+ assert_eq!(t.size(), 96);
+ }
}
diff --git a/datafusion/physical-expr/src/aggregate/variance.rs b/datafusion/physical-expr/src/aggregate/variance.rs
index d6ed8c957..8af810a9e 100644
--- a/datafusion/physical-expr/src/aggregate/variance.rs
+++ b/datafusion/physical-expr/src/aggregate/variance.rs
@@ -306,6 +306,10 @@ impl Accumulator for VarianceAccumulator {
Ok(ScalarValue::Float64(Some(self.m2 / count as f64)))
}
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ }
}
#[cfg(test)]
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 315fbd0dd..da61188f0 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -1226,6 +1226,10 @@ mod roundtrip_tests {
fn evaluate(&self) -> datafusion::error::Result<ScalarValue> {
Ok(ScalarValue::Float64(None))
}
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ }
}
let dummy_agg = create_udaf(