You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/25 10:38:26 UTC

[arrow-datafusion] branch master updated: add `{TDigest,ScalarValue,Accumulator}::size` (#4342)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e0150439e add `{TDigest,ScalarValue,Accumulator}::size` (#4342)
e0150439e is described below

commit e0150439e89e1d98ff362bddcafdb980ee499a7a
Author: Marco Neumann <ma...@crepererum.net>
AuthorDate: Fri Nov 25 10:38:21 2022 +0000

    add `{TDigest,ScalarValue,Accumulator}::size` (#4342)
    
    * feat: add `TDigest::size`
    
    * feat: add `ScalarValue::size`
    
    * feat: add `Accumulator::size`
    
    * docs: improve
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * refactor: simplify memory accounting
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion-examples/examples/simple_udaf.rs        |  4 +
 datafusion/common/src/scalar.rs                    | 89 ++++++++++++++++++++++
 datafusion/core/tests/user_defined_aggregates.rs   |  4 +
 datafusion/expr/src/accumulator.rs                 |  5 ++
 .../physical-expr/src/aggregate/approx_distinct.rs |  5 ++
 .../src/aggregate/approx_percentile_cont.rs        |  6 ++
 .../approx_percentile_cont_with_weight.rs          |  6 ++
 .../physical-expr/src/aggregate/array_agg.rs       |  6 ++
 .../src/aggregate/array_agg_distinct.rs            |  6 ++
 datafusion/physical-expr/src/aggregate/average.rs  |  4 +
 .../physical-expr/src/aggregate/correlation.rs     |  9 +++
 datafusion/physical-expr/src/aggregate/count.rs    |  4 +
 .../physical-expr/src/aggregate/count_distinct.rs  | 14 ++++
 .../physical-expr/src/aggregate/covariance.rs      |  4 +
 datafusion/physical-expr/src/aggregate/median.rs   | 14 ++++
 datafusion/physical-expr/src/aggregate/min_max.rs  |  8 ++
 datafusion/physical-expr/src/aggregate/stddev.rs   |  5 ++
 datafusion/physical-expr/src/aggregate/sum.rs      |  4 +
 .../physical-expr/src/aggregate/sum_distinct.rs    |  6 ++
 datafusion/physical-expr/src/aggregate/tdigest.rs  | 14 ++++
 datafusion/physical-expr/src/aggregate/variance.rs |  4 +
 datafusion/proto/src/lib.rs                        |  4 +
 22 files changed, 225 insertions(+)

diff --git a/datafusion-examples/examples/simple_udaf.rs b/datafusion-examples/examples/simple_udaf.rs
index 416d514e3..bb3a42b8b 100644
--- a/datafusion-examples/examples/simple_udaf.rs
+++ b/datafusion-examples/examples/simple_udaf.rs
@@ -153,6 +153,10 @@ impl Accumulator for GeometricMean {
             self.merge(&v)
         })
     }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
 }
 
 #[tokio::main]
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 96d1ab672..9a1119469 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -19,6 +19,7 @@
 
 use std::borrow::Borrow;
 use std::cmp::{max, Ordering};
+use std::collections::HashSet;
 use std::convert::{Infallible, TryInto};
 use std::ops::{Add, Sub};
 use std::str::FromStr;
@@ -2290,6 +2291,94 @@ impl ScalarValue {
             ScalarValue::Null => array.data().is_null(index),
         }
     }
+
+    /// Estimate size if bytes including `Self`. For values with internal containers such as `String`
+    /// includes the allocated size (`capacity`) rather than the current length (`len`)
+    pub fn size(&self) -> usize {
+        std::mem::size_of_val(&self)
+            + match self {
+                ScalarValue::Null
+                | ScalarValue::Boolean(_)
+                | ScalarValue::Float32(_)
+                | ScalarValue::Float64(_)
+                | ScalarValue::Decimal128(_, _, _)
+                | ScalarValue::Int8(_)
+                | ScalarValue::Int16(_)
+                | ScalarValue::Int32(_)
+                | ScalarValue::Int64(_)
+                | ScalarValue::UInt8(_)
+                | ScalarValue::UInt16(_)
+                | ScalarValue::UInt32(_)
+                | ScalarValue::UInt64(_)
+                | ScalarValue::Date32(_)
+                | ScalarValue::Date64(_)
+                | ScalarValue::Time32Second(_)
+                | ScalarValue::Time32Millisecond(_)
+                | ScalarValue::Time64Microsecond(_)
+                | ScalarValue::Time64Nanosecond(_)
+                | ScalarValue::IntervalYearMonth(_)
+                | ScalarValue::IntervalDayTime(_)
+                | ScalarValue::IntervalMonthDayNano(_) => 0,
+                ScalarValue::Utf8(s)
+                | ScalarValue::LargeUtf8(s)
+                | ScalarValue::TimestampSecond(_, s)
+                | ScalarValue::TimestampMillisecond(_, s)
+                | ScalarValue::TimestampMicrosecond(_, s)
+                | ScalarValue::TimestampNanosecond(_, s) => {
+                    s.as_ref().map(|s| s.capacity()).unwrap_or_default()
+                }
+                ScalarValue::Binary(b)
+                | ScalarValue::FixedSizeBinary(_, b)
+                | ScalarValue::LargeBinary(b) => {
+                    b.as_ref().map(|b| b.capacity()).unwrap_or_default()
+                }
+                // TODO(crepererum): `Field` is NOT fixed size, add `Field::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+                ScalarValue::List(vals, field) => {
+                    vals.as_ref()
+                        .map(|vals| Self::size_of_vec(vals) - std::mem::size_of_val(vals))
+                        .unwrap_or_default()
+                        + std::mem::size_of_val(field)
+                }
+                // TODO(crepererum): `Field` is NOT fixed size, add `Field::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+                ScalarValue::Struct(vals, fields) => {
+                    vals.as_ref()
+                        .map(|vals| {
+                            vals.iter()
+                                .map(|sv| sv.size() - std::mem::size_of_val(sv))
+                                .sum::<usize>()
+                                + (std::mem::size_of::<ScalarValue>() * vals.capacity())
+                        })
+                        .unwrap_or_default()
+                        + (std::mem::size_of::<Field>() * fields.capacity())
+                }
+                // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+                ScalarValue::Dictionary(dt, sv) => {
+                    std::mem::size_of_val(dt.as_ref()) + sv.size()
+                }
+            }
+    }
+
+    /// Estimates [size](Self::size) of [`Vec`] in bytes.
+    ///
+    /// Includes the size of the [`Vec`] container itself.
+    pub fn size_of_vec(vec: &Vec<Self>) -> usize {
+        (std::mem::size_of::<ScalarValue>() * vec.capacity())
+            + vec
+                .iter()
+                .map(|sv| sv.size() - std::mem::size_of_val(sv))
+                .sum::<usize>()
+    }
+
+    /// Estimates [size](Self::size) of [`HashSet`] in bytes.
+    ///
+    /// Includes the size of the [`HashSet`] container itself.
+    pub fn size_of_hashset<S>(set: &HashSet<Self, S>) -> usize {
+        (std::mem::size_of::<ScalarValue>() * set.capacity())
+            + set
+                .iter()
+                .map(|sv| sv.size() - std::mem::size_of_val(sv))
+                .sum::<usize>()
+    }
 }
 
 macro_rules! impl_scalar {
diff --git a/datafusion/core/tests/user_defined_aggregates.rs b/datafusion/core/tests/user_defined_aggregates.rs
index ea6838e70..2903d4272 100644
--- a/datafusion/core/tests/user_defined_aggregates.rs
+++ b/datafusion/core/tests/user_defined_aggregates.rs
@@ -244,4 +244,8 @@ impl Accumulator for FirstSelector {
         // same logic is needed as in update_batch
         self.update_batch(states)
     }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
 }
diff --git a/datafusion/expr/src/accumulator.rs b/datafusion/expr/src/accumulator.rs
index d0540f501..3d107c0c4 100644
--- a/datafusion/expr/src/accumulator.rs
+++ b/datafusion/expr/src/accumulator.rs
@@ -54,6 +54,11 @@ pub trait Accumulator: Send + Sync + Debug {
 
     /// returns its value based on its current state.
     fn evaluate(&self) -> Result<ScalarValue>;
+
+    /// Allocated size required for this accumulator, in bytes, including `Self`.
+    /// Allocated means that for internal containers such as `Vec`, the `capacity` should be used
+    /// not the `len`
+    fn size(&self) -> usize;
 }
 
 /// Representation of internal accumulator state. Accumulators can potentially have a mix of
diff --git a/datafusion/physical-expr/src/aggregate/approx_distinct.rs b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
index 7302008ba..a2ac7f093 100644
--- a/datafusion/physical-expr/src/aggregate/approx_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
@@ -239,6 +239,11 @@ macro_rules! default_accumulator_impl {
         fn evaluate(&self) -> Result<ScalarValue> {
             Ok(ScalarValue::UInt64(Some(self.hll.count() as u64)))
         }
+
+        fn size(&self) -> usize {
+            // HLL has static size
+            std::mem::size_of_val(self)
+        }
     };
 }
 
diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
index 750675719..2b1a5da85 100644
--- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
@@ -355,6 +355,7 @@ impl ApproxPercentileAccumulator {
         }
     }
 }
+
 impl Accumulator for ApproxPercentileAccumulator {
     fn state(&self) -> Result<Vec<AggregateState>> {
         Ok(self
@@ -413,4 +414,9 @@ impl Accumulator for ApproxPercentileAccumulator {
 
         Ok(())
     }
+
+    fn size(&self) -> usize {
+        // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+        std::mem::size_of_val(self) + self.digest.size()
+    }
 }
diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
index 85426015e..41f195f38 100644
--- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
@@ -150,4 +150,10 @@ impl Accumulator for ApproxPercentileWithWeightAccumulator {
 
         Ok(())
     }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+            - std::mem::size_of_val(&self.approx_percentile_cont_accumulator)
+            + self.approx_percentile_cont_accumulator.size()
+    }
 }
diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs
index 160e4477b..1eff48ce8 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg.rs
@@ -153,6 +153,12 @@ impl Accumulator for ArrayAggAccumulator {
             self.datatype.clone(),
         ))
     }
+
+    fn size(&self) -> usize {
+        // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+        std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.values)
+            - std::mem::size_of_val(&self.values)
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
index a0ef021b8..bb32a9ffd 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
@@ -156,6 +156,12 @@ impl Accumulator for DistinctArrayAggAccumulator {
             self.datatype.clone(),
         ))
     }
+
+    fn size(&self) -> usize {
+        // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+        std::mem::size_of_val(self) + ScalarValue::size_of_hashset(&self.values)
+            - std::mem::size_of_val(&self.values)
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index f034e3d56..da70252d4 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -200,6 +200,10 @@ impl Accumulator for AvgAccumulator {
             )),
         }
     }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
+    }
 }
 
 #[derive(Debug)]
diff --git a/datafusion/physical-expr/src/aggregate/correlation.rs b/datafusion/physical-expr/src/aggregate/correlation.rs
index 8645bd549..a013bd43a 100644
--- a/datafusion/physical-expr/src/aggregate/correlation.rs
+++ b/datafusion/physical-expr/src/aggregate/correlation.rs
@@ -193,6 +193,15 @@ impl Accumulator for CorrelationAccumulator {
 
         Ok(ScalarValue::Float64(None))
     }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.covar)
+            + self.covar.size()
+            - std::mem::size_of_val(&self.stddev1)
+            + self.stddev1.size()
+            - std::mem::size_of_val(&self.stddev2)
+            + self.stddev2.size()
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
index b64328aa3..4721bf8f2 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -149,6 +149,10 @@ impl Accumulator for CountAccumulator {
     fn evaluate(&self) -> Result<ScalarValue> {
         Ok(ScalarValue::Int64(Some(self.count)))
     }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
 }
 
 #[derive(Debug)]
diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs
index d4c0b4406..ead64ebd8 100644
--- a/datafusion/physical-expr/src/aggregate/count_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs
@@ -218,6 +218,20 @@ impl Accumulator for DistinctCountAccumulator {
             ))),
         }
     }
+
+    fn size(&self) -> usize {
+        // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+        std::mem::size_of_val(self)
+            + (std::mem::size_of::<DistinctScalarValues>() * self.values.capacity())
+            + self
+                .values
+                .iter()
+                .map(|vals| {
+                    ScalarValue::size_of_vec(&vals.0) - std::mem::size_of_val(&vals.0)
+                })
+                .sum::<usize>()
+            + (std::mem::size_of::<DataType>() * self.state_data_types.capacity())
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs b/datafusion/physical-expr/src/aggregate/covariance.rs
index 7797e008d..b5343059e 100644
--- a/datafusion/physical-expr/src/aggregate/covariance.rs
+++ b/datafusion/physical-expr/src/aggregate/covariance.rs
@@ -373,6 +373,10 @@ impl Accumulator for CovarianceAccumulator {
             Ok(ScalarValue::Float64(Some(self.algo_const / count as f64)))
         }
     }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs
index 968cd6986..deef5dec2 100644
--- a/datafusion/physical-expr/src/aggregate/median.rs
+++ b/datafusion/physical-expr/src/aggregate/median.rs
@@ -183,6 +183,20 @@ impl Accumulator for MedianAccumulator {
             )),
         }
     }
+
+    fn size(&self) -> usize {
+        // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+        std::mem::align_of_val(self)
+            + (std::mem::size_of::<ArrayRef>() * self.all_values.capacity())
+            + self
+                .all_values
+                .iter()
+                .map(|array_ref| {
+                    std::mem::size_of_val(array_ref.as_ref())
+                        + array_ref.get_array_memory_size()
+                })
+                .sum::<usize>()
+    }
 }
 
 /// Create an empty array
diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs
index 2d6961bfc..73f898c42 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -571,6 +571,10 @@ impl Accumulator for MaxAccumulator {
     fn evaluate(&self) -> Result<ScalarValue> {
         Ok(self.max.clone())
     }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size()
+    }
 }
 
 #[derive(Debug)]
@@ -735,6 +739,10 @@ impl Accumulator for MinAccumulator {
     fn evaluate(&self) -> Result<ScalarValue> {
         Ok(self.min.clone())
     }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size()
+    }
 }
 
 #[derive(Debug)]
diff --git a/datafusion/physical-expr/src/aggregate/stddev.rs b/datafusion/physical-expr/src/aggregate/stddev.rs
index 639971811..94ec2418b 100644
--- a/datafusion/physical-expr/src/aggregate/stddev.rs
+++ b/datafusion/physical-expr/src/aggregate/stddev.rs
@@ -211,6 +211,11 @@ impl Accumulator for StddevAccumulator {
             )),
         }
     }
+
+    fn size(&self) -> usize {
+        std::mem::align_of_val(self) - std::mem::align_of_val(&self.variance)
+            + self.variance.size()
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs
index 112856988..32259bc30 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -279,6 +279,10 @@ impl Accumulator for SumAccumulator {
             Ok(self.sum.clone())
         }
     }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
+    }
 }
 
 #[derive(Debug)]
diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
index 73c4828e8..95823c517 100644
--- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
@@ -175,6 +175,12 @@ impl Accumulator for DistinctSumAccumulator {
         }
         Ok(sum_value)
     }
+
+    fn size(&self) -> usize {
+        // TODO(crepererum): `DataType` is NOT fixed size, add `DataType::size` method to arrow (https://github.com/apache/arrow-rs/issues/3147)
+        std::mem::size_of_val(self) + ScalarValue::size_of_hashset(&self.hash_values)
+            - std::mem::size_of_val(&self.hash_values)
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/physical-expr/src/aggregate/tdigest.rs b/datafusion/physical-expr/src/aggregate/tdigest.rs
index 977678e5b..c25566077 100644
--- a/datafusion/physical-expr/src/aggregate/tdigest.rs
+++ b/datafusion/physical-expr/src/aggregate/tdigest.rs
@@ -187,6 +187,12 @@ impl TDigest {
     pub(crate) fn max_size(&self) -> usize {
         self.max_size
     }
+
+    /// Size in bytes including `Self`.
+    pub(crate) fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+            + (std::mem::size_of::<Centroid>() * self.centroids.capacity())
+    }
 }
 
 impl Default for TDigest {
@@ -741,4 +747,12 @@ mod tests {
         assert_error_bounds!(t, quantile = 0.5, want = 500.0);
         assert_state_roundtrip!(t);
     }
+
+    #[test]
+    fn test_size() {
+        let t = TDigest::new(10);
+        let t = t.merge_unsorted_f64(vec![0.0, 1.0]);
+
+        assert_eq!(t.size(), 96);
+    }
 }
diff --git a/datafusion/physical-expr/src/aggregate/variance.rs b/datafusion/physical-expr/src/aggregate/variance.rs
index d6ed8c957..8af810a9e 100644
--- a/datafusion/physical-expr/src/aggregate/variance.rs
+++ b/datafusion/physical-expr/src/aggregate/variance.rs
@@ -306,6 +306,10 @@ impl Accumulator for VarianceAccumulator {
             Ok(ScalarValue::Float64(Some(self.m2 / count as f64)))
         }
     }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 315fbd0dd..da61188f0 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -1226,6 +1226,10 @@ mod roundtrip_tests {
             fn evaluate(&self) -> datafusion::error::Result<ScalarValue> {
                 Ok(ScalarValue::Float64(None))
             }
+
+            fn size(&self) -> usize {
+                std::mem::size_of_val(self)
+            }
         }
 
         let dummy_agg = create_udaf(