You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/07/05 18:48:42 UTC
[arrow-datafusion] branch main updated: Improve median performance. (#6837)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c9a6fb8f5b Improve median performance. (#6837)
c9a6fb8f5b is described below
commit c9a6fb8f5b2b46f93642d7771254efa2831077e8
Author: vincev <vi...@gmail.com>
AuthorDate: Wed Jul 5 20:48:36 2023 +0200
Improve median performance. (#6837)
* Improve median performance.
* Fix formatting.
* Review feedback
* Renamed arrays size.
---
datafusion/physical-expr/src/aggregate/median.rs | 48 +++++++++++++++++++-----
1 file changed, 38 insertions(+), 10 deletions(-)
diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs
index 6f79c98a6c..28f1fc3199 100644
--- a/datafusion/physical-expr/src/aggregate/median.rs
+++ b/datafusion/physical-expr/src/aggregate/median.rs
@@ -66,6 +66,7 @@ impl AggregateExpr for Median {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(MedianAccumulator {
data_type: self.data_type.clone(),
+ arrays: vec![],
all_values: vec![],
}))
}
@@ -108,16 +109,21 @@ impl PartialEq<dyn Any> for Median {
/// The median accumulator accumulates the raw input values
/// as `ScalarValue`s
///
-/// The intermediate state is represented as a List of those scalars
+/// The intermediate state is represented as a List of scalar values updated by
+/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
+/// in the final evaluation step so that we avoid expensive conversions and
+/// allocations during `update_batch`.
struct MedianAccumulator {
data_type: DataType,
+ arrays: Vec<ArrayRef>,
all_values: Vec<ScalarValue>,
}
impl Accumulator for MedianAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
- let state =
- ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone());
+ let all_values = to_scalar_values(&self.arrays)?;
+ let state = ScalarValue::new_list(Some(all_values), self.data_type.clone());
+
Ok(vec![state])
}
@@ -125,12 +131,9 @@ impl Accumulator for MedianAccumulator {
assert_eq!(values.len(), 1);
let array = &values[0];
+ // Defer conversions to scalar values to final evaluation.
assert_eq!(array.data_type(), &self.data_type);
- self.all_values.reserve(array.len());
- for index in 0..array.len() {
- self.all_values
- .push(ScalarValue::try_from_array(array, index)?);
- }
+ self.arrays.push(array.clone());
Ok(())
}
@@ -157,7 +160,14 @@ impl Accumulator for MedianAccumulator {
}
fn evaluate(&self) -> Result<ScalarValue> {
- if !self.all_values.iter().any(|v| !v.is_null()) {
+ let batch_values = to_scalar_values(&self.arrays)?;
+
+ if !self
+ .all_values
+ .iter()
+ .chain(batch_values.iter())
+ .any(|v| !v.is_null())
+ {
return ScalarValue::try_from(&self.data_type);
}
@@ -166,6 +176,7 @@ impl Accumulator for MedianAccumulator {
let array = ScalarValue::iter_to_array(
self.all_values
.iter()
+ .chain(batch_values.iter())
// ignore null values
.filter(|v| !v.is_null())
.cloned(),
@@ -214,13 +225,30 @@ impl Accumulator for MedianAccumulator {
}
fn size(&self) -> usize {
- std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.all_values)
+ let arrays_size: usize = self.arrays.iter().map(|a| a.len()).sum();
+
+ std::mem::size_of_val(self)
+ + ScalarValue::size_of_vec(&self.all_values)
+ + arrays_size
- std::mem::size_of_val(&self.all_values)
+ self.data_type.size()
- std::mem::size_of_val(&self.data_type)
}
}
+fn to_scalar_values(arrays: &[ArrayRef]) -> Result<Vec<ScalarValue>> {
+ let num_values: usize = arrays.iter().map(|a| a.len()).sum();
+ let mut all_values = Vec::with_capacity(num_values);
+
+ for array in arrays {
+ for index in 0..array.len() {
+ all_values.push(ScalarValue::try_from_array(&array, index)?);
+ }
+ }
+
+ Ok(all_values)
+}
+
/// Given a returns `array[indicies[indicie_index]]` as a `ScalarValue`
fn scalar_at_index(
array: &dyn Array,