You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yj...@apache.org on 2022/04/07 10:31:48 UTC

[arrow-datafusion] branch master updated: minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen (#2171)

This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 0da1f370f minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen (#2171)
0da1f370f is described below

commit 0da1f370f36a04ce1648c195e0bfe8e03a1c2c2b
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Thu Apr 7 18:31:43 2022 +0800

    minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen (#2171)
    
    * Avoid per cell evaluation in Coalesce, use zip in CaseWhen
    
    * stop early
---
 .../physical-expr/src/conditional_expressions.rs   |  63 +++++----
 datafusion/physical-expr/src/expressions/case.rs   | 150 +--------------------
 2 files changed, 44 insertions(+), 169 deletions(-)

diff --git a/datafusion/physical-expr/src/conditional_expressions.rs b/datafusion/physical-expr/src/conditional_expressions.rs
index 557d096c1..4e8675f8e 100644
--- a/datafusion/physical-expr/src/conditional_expressions.rs
+++ b/datafusion/physical-expr/src/conditional_expressions.rs
@@ -15,11 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::Arc;
-
 use arrow::array::{new_null_array, Array, BooleanArray};
-use arrow::compute;
 use arrow::compute::kernels::zip::zip;
+use arrow::compute::{and, is_not_null, is_null};
 use arrow::datatypes::DataType;
 
 use datafusion_common::{DataFusionError, Result};
@@ -35,36 +33,51 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
         )));
     }
 
-    let size = match args[0] {
-        ColumnarValue::Array(ref a) => a.len(),
-        ColumnarValue::Scalar(ref _s) => 1,
-    };
-    let mut res = new_null_array(&args[0].data_type(), size);
+    let return_type = args[0].data_type();
+    let mut return_array = args.iter().filter_map(|x| match x {
+        ColumnarValue::Array(array) => Some(array.len()),
+        _ => None,
+    });
+
+    if let Some(size) = return_array.next() {
+        // start with nulls as default output
+        let mut current_value = new_null_array(&return_type, size);
+        let mut remainder = BooleanArray::from(vec![true; size]);
 
-    for column_value in args {
-        for i in 0..size {
-            match column_value {
-                ColumnarValue::Array(array_ref) => {
-                    let curr_null_mask = compute::is_null(res.as_ref())?;
-                    let arr_not_null_mask = compute::is_not_null(array_ref)?;
-                    let bool_mask = compute::and(&curr_null_mask, &arr_not_null_mask)?;
-                    res = zip(&bool_mask, array_ref, &res)?;
+        for arg in args {
+            match arg {
+                ColumnarValue::Array(ref array) => {
+                    let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?;
+                    current_value = zip(&to_apply, array, current_value.as_ref())?;
+                    remainder = and(&remainder, &is_null(array)?)?;
                 }
-                ColumnarValue::Scalar(scalar) => {
-                    if !scalar.is_null() && res.is_null(i) {
-                        let vec: Vec<bool> =
-                            (0..size).into_iter().map(|j| j == i).collect();
-                        let bool_arr = BooleanArray::from(vec);
-                        res =
-                            zip(&bool_arr, scalar.to_array_of_size(size).as_ref(), &res)?;
+                ColumnarValue::Scalar(value) => {
+                    if value.is_null() {
                         continue;
+                    } else {
+                        let last_value = value.to_array_of_size(size);
+                        current_value =
+                            zip(&remainder, &last_value, current_value.as_ref())?;
+                        break;
                     }
                 }
             }
+            if remainder.iter().all(|x| x == Some(false)) {
+                break;
+            }
         }
+        Ok(ColumnarValue::Array(current_value))
+    } else {
+        let result = args
+            .iter()
+            .filter_map(|x| match x {
+                ColumnarValue::Scalar(s) if !s.is_null() => Some(x.clone()),
+                _ => None,
+            })
+            .next()
+            .unwrap_or_else(|| args[0].clone());
+        Ok(result)
     }
-
-    Ok(ColumnarValue::Array(Arc::new(res)))
 }
 
 /// Currently supported types by the coalesce function.
diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs
index 02f1d3c38..df763ec9a 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -19,7 +19,8 @@ use std::{any::Any, sync::Arc};
 
 use crate::expressions::try_cast;
 use crate::PhysicalExpr;
-use arrow::array::{self, *};
+use arrow::array::*;
+use arrow::compute::kernels::zip::zip;
 use arrow::compute::{and, eq_dyn, is_null, not, or, or_kleene};
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
@@ -107,144 +108,6 @@ impl CaseExpr {
     }
 }
 
-macro_rules! if_then_else {
-    ($BUILDER_TYPE:ty, $ARRAY_TYPE:ty, $BOOLS:expr, $TRUE:expr, $FALSE:expr) => {{
-        let true_values = $TRUE
-            .as_ref()
-            .as_any()
-            .downcast_ref::<$ARRAY_TYPE>()
-            .expect("true_values downcast failed");
-
-        let false_values = $FALSE
-            .as_ref()
-            .as_any()
-            .downcast_ref::<$ARRAY_TYPE>()
-            .expect("false_values downcast failed");
-
-        let mut builder = <$BUILDER_TYPE>::new($BOOLS.len());
-        for i in 0..$BOOLS.len() {
-            if $BOOLS.is_null(i) {
-                if false_values.is_null(i) {
-                    builder.append_null()?;
-                } else {
-                    builder.append_value(false_values.value(i))?;
-                }
-            } else if $BOOLS.value(i) {
-                if true_values.is_null(i) {
-                    builder.append_null()?;
-                } else {
-                    builder.append_value(true_values.value(i))?;
-                }
-            } else {
-                if false_values.is_null(i) {
-                    builder.append_null()?;
-                } else {
-                    builder.append_value(false_values.value(i))?;
-                }
-            }
-        }
-        Ok(Arc::new(builder.finish()))
-    }};
-}
-
-fn if_then_else(
-    bools: &BooleanArray,
-    true_values: ArrayRef,
-    false_values: ArrayRef,
-    data_type: &DataType,
-) -> Result<ArrayRef> {
-    match data_type {
-        DataType::UInt8 => if_then_else!(
-            array::UInt8Builder,
-            array::UInt8Array,
-            bools,
-            true_values,
-            false_values
-        ),
-        DataType::UInt16 => if_then_else!(
-            array::UInt16Builder,
-            array::UInt16Array,
-            bools,
-            true_values,
-            false_values
-        ),
-        DataType::UInt32 => if_then_else!(
-            array::UInt32Builder,
-            array::UInt32Array,
-            bools,
-            true_values,
-            false_values
-        ),
-        DataType::UInt64 => if_then_else!(
-            array::UInt64Builder,
-            array::UInt64Array,
-            bools,
-            true_values,
-            false_values
-        ),
-        DataType::Int8 => if_then_else!(
-            array::Int8Builder,
-            array::Int8Array,
-            bools,
-            true_values,
-            false_values
-        ),
-        DataType::Int16 => if_then_else!(
-            array::Int16Builder,
-            array::Int16Array,
-            bools,
-            true_values,
-            false_values
-        ),
-        DataType::Int32 => if_then_else!(
-            array::Int32Builder,
-            array::Int32Array,
-            bools,
-            true_values,
-            false_values
-        ),
-        DataType::Int64 => if_then_else!(
-            array::Int64Builder,
-            array::Int64Array,
-            bools,
-            true_values,
-            false_values
-        ),
-        DataType::Float32 => if_then_else!(
-            array::Float32Builder,
-            array::Float32Array,
-            bools,
-            true_values,
-            false_values
-        ),
-        DataType::Float64 => if_then_else!(
-            array::Float64Builder,
-            array::Float64Array,
-            bools,
-            true_values,
-            false_values
-        ),
-        DataType::Utf8 => if_then_else!(
-            array::StringBuilder,
-            array::StringArray,
-            bools,
-            true_values,
-            false_values
-        ),
-        DataType::Boolean => if_then_else!(
-            array::BooleanBuilder,
-            array::BooleanArray,
-            bools,
-            true_values,
-            false_values
-        ),
-        other => Err(DataFusionError::Execution(format!(
-            "CASE does not support '{:?}'",
-            other
-        ))),
-    }
-}
-
 impl CaseExpr {
     /// This function evaluates the form of CASE that matches an expression to fixed values.
     ///
@@ -278,7 +141,7 @@ impl CaseExpr {
             let then_value = then_value.into_array(batch.num_rows());
 
             current_value =
-                if_then_else(&when_match, then_value, current_value, &return_type)?;
+                zip(&when_match, then_value.as_ref(), current_value.as_ref())?;
 
             remainder = and(&remainder, &or_kleene(&not(&when_match)?, &base_nulls)?)?;
         }
@@ -292,7 +155,7 @@ impl CaseExpr {
             let else_ = expr
                 .evaluate_selection(batch, &remainder)?
                 .into_array(batch.num_rows());
-            current_value = if_then_else(&remainder, else_, current_value, &return_type)?;
+            current_value = zip(&remainder, else_.as_ref(), current_value.as_ref())?;
         }
 
         Ok(ColumnarValue::Array(current_value))
@@ -327,8 +190,7 @@ impl CaseExpr {
                 .evaluate_selection(batch, when_value)?;
             let then_value = then_value.into_array(batch.num_rows());
 
-            current_value =
-                if_then_else(when_value, then_value, current_value, &return_type)?;
+            current_value = zip(when_value, then_value.as_ref(), current_value.as_ref())?;
 
             // Succeed tuples should be filtered out for short-circuit evaluation,
             // null values for the current when expr should be kept
@@ -345,7 +207,7 @@ impl CaseExpr {
             let else_ = expr
                 .evaluate_selection(batch, &remainder)?
                 .into_array(batch.num_rows());
-            current_value = if_then_else(&remainder, else_, current_value, &return_type)?;
+            current_value = zip(&remainder, else_.as_ref(), current_value.as_ref())?;
         }
 
         Ok(ColumnarValue::Array(current_value))