You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "vincev (via GitHub)" <gi...@apache.org> on 2023/07/04 08:44:08 UTC

[GitHub] [arrow-datafusion] vincev opened a new pull request, #6837: Improve median performance.

vincev opened a new pull request, #6837:
URL: https://github.com/apache/arrow-datafusion/pull/6837

   # Which issue does this PR close?
   
   Related to discussion in #4973.
   
   # Rationale for this change
   
   This PR improves the median aggregator by reducing the number of allocations. 
   
   For this example I am using one year of data from the [nyctaxi](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) dataset, running  a group by `payment_type` and computing the `total_amount` median with the current version it takes ~22secs:
   
   ```
   $ time ./target/release/datafusion-median
   +--------------+--------------+----------+
   | payment_type | total_amount | n        |
   +--------------+--------------+----------+
   | 5            | 5.275        | 6        |
   | 3            | 7.7          | 194323   |
   | 4            | -6.8         | 244364   |
   | 0            | 23.0         | 1368303  |
   | 2            | 13.3         | 7763339  |
   | 1            | 16.56        | 30085763 |
   +--------------+--------------+----------+
   
   real    0m22.861s
   user    1m54.033s
   sys     0m2.872s
   ```
   
   with the changes introduced in this PR we get the same result in 2secs:
   
   ```
   $ time ./target/release/datafusion-median
   +--------------+--------------+----------+
   | payment_type | total_amount | n        |
   +--------------+--------------+----------+
   | 5            | 5.275        | 6        |
   | 3            | 7.7          | 194323   |
   | 4            | -6.8         | 244364   |
   | 0            | 23.0         | 1368303  |
   | 2            | 13.3         | 7763339  |
   | 1            | 16.56        | 30085763 |
   +--------------+--------------+----------+
   
   real    0m2.514s
   user    0m4.725s
   sys     0m1.765s
   ```
   
   
   # What changes are included in this PR?
   
   Reduce number of allocations.
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   
   Run tests locally they all passed.
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] vincev commented on a diff in pull request #6837: Improve median performance.

Posted by "vincev (via GitHub)" <gi...@apache.org>.
vincev commented on code in PR #6837:
URL: https://github.com/apache/arrow-datafusion/pull/6837#discussion_r1252024601


##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -111,13 +112,28 @@ impl PartialEq<dyn Any> for Median {
 /// The intermediate state is represented as a List of those scalars
 struct MedianAccumulator {
     data_type: DataType,
+    batches: Vec<ArrayRef>,
     all_values: Vec<ScalarValue>,
 }
 
+fn to_scalar_values(arrays: &[ArrayRef]) -> Result<Vec<ScalarValue>> {
+    let num_values: usize = arrays.iter().map(|a| a.len()).sum();
+    let mut all_values = Vec::with_capacity(num_values);
+
+    for array in arrays {
+        for index in 0..array.len() {
+            all_values.push(ScalarValue::try_from_array(&array, index)?);
+        }
+    }
+
+    Ok(all_values)
+}
+
 impl Accumulator for MedianAccumulator {
     fn state(&self) -> Result<Vec<ScalarValue>> {
-        let state =
-            ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone());
+        let all_values = to_scalar_values(&self.batches)?;
+        let state = ScalarValue::new_list(Some(all_values), self.data_type.clone());

Review Comment:
   That would work well, the reason I was suggesting adding an `Array` variant instead of changing `List` was to avoid changing all the code that depends on `List(Option<Vec<ScalarValue>>, FieldRef)`, but yes long term that would probably be best. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6837: Improve median performance.

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6837:
URL: https://github.com/apache/arrow-datafusion/pull/6837#discussion_r1252006197


##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -111,13 +112,28 @@ impl PartialEq<dyn Any> for Median {
 /// The intermediate state is represented as a List of those scalars
 struct MedianAccumulator {
     data_type: DataType,
+    batches: Vec<ArrayRef>,
     all_values: Vec<ScalarValue>,
 }
 
+fn to_scalar_values(arrays: &[ArrayRef]) -> Result<Vec<ScalarValue>> {
+    let num_values: usize = arrays.iter().map(|a| a.len()).sum();
+    let mut all_values = Vec::with_capacity(num_values);
+
+    for array in arrays {
+        for index in 0..array.len() {
+            all_values.push(ScalarValue::try_from_array(&array, index)?);
+        }
+    }
+
+    Ok(all_values)
+}
+
 impl Accumulator for MedianAccumulator {
     fn state(&self) -> Result<Vec<ScalarValue>> {
-        let state =
-            ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone());
+        let all_values = to_scalar_values(&self.batches)?;
+        let state = ScalarValue::new_list(Some(all_values), self.data_type.clone());

Review Comment:
   Maybe @Dandandan  was suggesting
   
   ```rust
   impl ScalarValue {
     ...
     List(ArrayRef)
     ...
   }
   ```
   
   In general this would work well with the approach @tustvold  is working on upstream in arrow-rs with `Datum` -- https://github.com/apache/arrow-rs/pull/4393



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #6837: Improve median performance.

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #6837:
URL: https://github.com/apache/arrow-datafusion/pull/6837#discussion_r1251736845


##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -111,13 +112,28 @@ impl PartialEq<dyn Any> for Median {
 /// The intermediate state is represented as a List of those scalars
 struct MedianAccumulator {
     data_type: DataType,
+    batches: Vec<ArrayRef>,
     all_values: Vec<ScalarValue>,
 }
 
+fn to_scalar_values(arrays: &[ArrayRef]) -> Result<Vec<ScalarValue>> {
+    let num_values: usize = arrays.iter().map(|a| a.len()).sum();
+    let mut all_values = Vec::with_capacity(num_values);
+
+    for array in arrays {
+        for index in 0..array.len() {
+            all_values.push(ScalarValue::try_from_array(&array, index)?);
+        }
+    }
+
+    Ok(all_values)
+}
+
 impl Accumulator for MedianAccumulator {
     fn state(&self) -> Result<Vec<ScalarValue>> {
-        let state =
-            ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone());
+        let all_values = to_scalar_values(&self.batches)?;
+        let state = ScalarValue::new_list(Some(all_values), self.data_type.clone());

Review Comment:
   I wonder if we can change `ScalarValue::List` to use `ArrayRef` instead of `Vec<ScalarValue>` internally.
   This would avoid quite some expensive conversions from/to `ScalarValue`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #6837: Improve median performance.

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #6837:
URL: https://github.com/apache/arrow-datafusion/pull/6837#discussion_r1252016225


##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -111,13 +112,28 @@ impl PartialEq<dyn Any> for Median {
 /// The intermediate state is represented as a List of those scalars
 struct MedianAccumulator {
     data_type: DataType,
+    batches: Vec<ArrayRef>,
     all_values: Vec<ScalarValue>,
 }
 
+fn to_scalar_values(arrays: &[ArrayRef]) -> Result<Vec<ScalarValue>> {
+    let num_values: usize = arrays.iter().map(|a| a.len()).sum();
+    let mut all_values = Vec::with_capacity(num_values);
+
+    for array in arrays {
+        for index in 0..array.len() {
+            all_values.push(ScalarValue::try_from_array(&array, index)?);
+        }
+    }
+
+    Ok(all_values)
+}
+
 impl Accumulator for MedianAccumulator {
     fn state(&self) -> Result<Vec<ScalarValue>> {
-        let state =
-            ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone());
+        let all_values = to_scalar_values(&self.batches)?;
+        let state = ScalarValue::new_list(Some(all_values), self.data_type.clone());

Review Comment:
   Yeah that's what I was trying to suggest - this would avoid the need to convert / allocate to individual `ScalarValue`s and convert to `Array` later on again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6837: Improve median performance.

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6837:
URL: https://github.com/apache/arrow-datafusion/pull/6837#discussion_r1253351668


##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -126,11 +142,7 @@ impl Accumulator for MedianAccumulator {
         let array = &values[0];
 
         assert_eq!(array.data_type(), &self.data_type);
-        self.all_values.reserve(array.len());
-        for index in 0..array.len() {
-            self.all_values
-                .push(ScalarValue::try_from_array(array, index)?);
-        }
+        self.batches.push(array.clone());

Review Comment:
   ```suggestion
           // defer conversion to final evaluation 
           self.batches.push(array.clone());
   ```



##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -111,13 +112,28 @@ impl PartialEq<dyn Any> for Median {
 /// The intermediate state is represented as a List of those scalars
 struct MedianAccumulator {
     data_type: DataType,
+    batches: Vec<ArrayRef>,
     all_values: Vec<ScalarValue>,
 }
 
+fn to_scalar_values(arrays: &[ArrayRef]) -> Result<Vec<ScalarValue>> {
+    let num_values: usize = arrays.iter().map(|a| a.len()).sum();
+    let mut all_values = Vec::with_capacity(num_values);
+
+    for array in arrays {
+        for index in 0..array.len() {
+            all_values.push(ScalarValue::try_from_array(&array, index)?);
+        }
+    }
+
+    Ok(all_values)
+}
+
 impl Accumulator for MedianAccumulator {
     fn state(&self) -> Result<Vec<ScalarValue>> {
-        let state =
-            ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone());
+        let all_values = to_scalar_values(&self.batches)?;
+        let state = ScalarValue::new_list(Some(all_values), self.data_type.clone());

Review Comment:
   After thinking about this some more, I think the most performant thing to do will be to implement a native `GroupsAccumulator` (aka #6800 ) for median. With sufficient effort we could make median be very fast -- so I think this is a good improvement for now



##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -111,13 +112,28 @@ impl PartialEq<dyn Any> for Median {
 /// The intermediate state is represented as a List of those scalars

Review Comment:
   Can you please update the comments to reflect the change here (aka that the state is is stored as ScalarValues, but the conversion is delayed if possible)



##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -111,13 +112,28 @@ impl PartialEq<dyn Any> for Median {
 /// The intermediate state is represented as a List of those scalars
 struct MedianAccumulator {
     data_type: DataType,
+    batches: Vec<ArrayRef>,
     all_values: Vec<ScalarValue>,
 }
 
+fn to_scalar_values(arrays: &[ArrayRef]) -> Result<Vec<ScalarValue>> {
+    let num_values: usize = arrays.iter().map(|a| a.len()).sum();

Review Comment:
   💯 for computing the capacity up front



##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -111,13 +112,28 @@ impl PartialEq<dyn Any> for Median {
 /// The intermediate state is represented as a List of those scalars
 struct MedianAccumulator {
     data_type: DataType,
+    batches: Vec<ArrayRef>,

Review Comment:
   Nit is that this is actually arrays (not batches)  -- maybe `arrays` is a better name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] vincev commented on pull request #6837: Improve median performance.

Posted by "vincev (via GitHub)" <gi...@apache.org>.
vincev commented on PR #6837:
URL: https://github.com/apache/arrow-datafusion/pull/6837#issuecomment-1622297922

   Thank you @alamb, @Dandandan for your feedback and review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #6837: Improve median performance.

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #6837:
URL: https://github.com/apache/arrow-datafusion/pull/6837


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6837: Improve median performance.

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6837:
URL: https://github.com/apache/arrow-datafusion/pull/6837#issuecomment-1622292954

   Thanks @vincev 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] vincev commented on a diff in pull request #6837: Improve median performance.

Posted by "vincev (via GitHub)" <gi...@apache.org>.
vincev commented on code in PR #6837:
URL: https://github.com/apache/arrow-datafusion/pull/6837#discussion_r1251745265


##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -111,13 +112,28 @@ impl PartialEq<dyn Any> for Median {
 /// The intermediate state is represented as a List of those scalars
 struct MedianAccumulator {
     data_type: DataType,
+    batches: Vec<ArrayRef>,
     all_values: Vec<ScalarValue>,
 }
 
+fn to_scalar_values(arrays: &[ArrayRef]) -> Result<Vec<ScalarValue>> {
+    let num_values: usize = arrays.iter().map(|a| a.len()).sum();
+    let mut all_values = Vec::with_capacity(num_values);
+
+    for array in arrays {
+        for index in 0..array.len() {
+            all_values.push(ScalarValue::try_from_array(&array, index)?);
+        }
+    }
+
+    Ok(all_values)
+}
+
 impl Accumulator for MedianAccumulator {
     fn state(&self) -> Result<Vec<ScalarValue>> {
-        let state =
-            ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone());
+        let all_values = to_scalar_values(&self.batches)?;
+        let state = ScalarValue::new_list(Some(all_values), self.data_type.clone());

Review Comment:
   Yes that would help a lot, maybe we can add a `ScalarValue::Array` variant to `ScalarValue`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org