You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yj...@apache.org on 2022/04/13 01:02:41 UTC

[arrow-datafusion] branch master updated: Use `filter` (filter_record_batch) instead of `take` to avoid using indices (#2218)

This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d75948b6 Use `filter` (filter_record_batch) instead of `take` to avoid using indices (#2218)
6d75948b6 is described below

commit 6d75948b6264526eac92a20a8dc211ca44df804f
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Wed Apr 13 03:02:35 2022 +0200

    Use `filter` (filter_record_batch) instead of `take` to avoid using indices (#2218)
    
    * Use `filter` instead of `take` to avoid using indices
    
    * Simplify / speed up by performing it on a record batch
    
    * Fix
    
    * Bring back special case
    
    * Handle selection array with nulls
    
    * Simplify
    
    * Use size of output from filter to check filter count
    
    * Simplify
    
    * Fix import in tests
    
    * Clippy
---
 datafusion/physical-expr/src/physical_expr.rs | 33 ++++++++-------------------
 1 file changed, 10 insertions(+), 23 deletions(-)

diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs
index 1f1da388d..fd9615db9 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use arrow::datatypes::{DataType, Schema};
-use arrow::error::Result as ArrowResult;
 
 use arrow::record_batch::RecordBatch;
 
@@ -25,12 +24,9 @@ use datafusion_common::Result;
 use datafusion_expr::ColumnarValue;
 use std::fmt::{Debug, Display};
 
-use arrow::array::{
-    make_array, Array, ArrayRef, BooleanArray, MutableArrayData, UInt64Array,
-};
-use arrow::compute::{and_kleene, is_not_null, take, SlicesIterator};
+use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
+use arrow::compute::{and_kleene, filter_record_batch, is_not_null, SlicesIterator};
 use std::any::Any;
-use std::sync::Arc;
 
 /// Expression that can be evaluated against a RecordBatch
 /// A Physical expression knows its type, nullability and how to evaluate itself.
@@ -51,24 +47,13 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
         batch: &RecordBatch,
         selection: &BooleanArray,
     ) -> Result<ColumnarValue> {
-        if selection.iter().all(|b| b == Some(true)) {
-            return self.evaluate(batch);
-        }
-        let mut indices = vec![];
-        for (i, b) in selection.iter().enumerate() {
-            if let Some(true) = b {
-                indices.push(i as u64);
-            }
-        }
-        let indices = UInt64Array::from_iter_values(indices);
-        let tmp_columns = batch
-            .columns()
-            .iter()
-            .map(|c| take(c.as_ref(), &indices, None))
-            .collect::<ArrowResult<Vec<Arc<dyn Array>>>>()?;
-
-        let tmp_batch = RecordBatch::try_new(batch.schema(), tmp_columns)?;
+        let tmp_batch = filter_record_batch(batch, selection)?;
+
         let tmp_result = self.evaluate(&tmp_batch)?;
+        // All values from the `selection` filter are true.
+        if batch.num_rows() == tmp_batch.num_rows() {
+            return Ok(tmp_result);
+        }
         if let ColumnarValue::Array(a) = tmp_result {
             let result = scatter(selection, a.as_ref())?;
             Ok(ColumnarValue::Array(result))
@@ -123,6 +108,8 @@ fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
 
 #[cfg(test)]
 mod tests {
+    use std::sync::Arc;
+
     use super::*;
     use arrow::array::Int32Array;
     use datafusion_common::Result;