You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yj...@apache.org on 2022/04/07 10:31:48 UTC
[arrow-datafusion] branch master updated: minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen (#2171)
This is an automated email from the ASF dual-hosted git repository.
yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 0da1f370f minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen (#2171)
0da1f370f is described below
commit 0da1f370f36a04ce1648c195e0bfe8e03a1c2c2b
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Thu Apr 7 18:31:43 2022 +0800
minor: Avoid per cell evaluation in Coalesce, use zip in CaseWhen (#2171)
* Avoid per cell evaluation in Coalesce, use zip in CaseWhen
* stop early
---
.../physical-expr/src/conditional_expressions.rs | 63 +++++----
datafusion/physical-expr/src/expressions/case.rs | 150 +--------------------
2 files changed, 44 insertions(+), 169 deletions(-)
diff --git a/datafusion/physical-expr/src/conditional_expressions.rs b/datafusion/physical-expr/src/conditional_expressions.rs
index 557d096c1..4e8675f8e 100644
--- a/datafusion/physical-expr/src/conditional_expressions.rs
+++ b/datafusion/physical-expr/src/conditional_expressions.rs
@@ -15,11 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::Arc;
-
use arrow::array::{new_null_array, Array, BooleanArray};
-use arrow::compute;
use arrow::compute::kernels::zip::zip;
+use arrow::compute::{and, is_not_null, is_null};
use arrow::datatypes::DataType;
use datafusion_common::{DataFusionError, Result};
@@ -35,36 +33,51 @@ pub fn coalesce(args: &[ColumnarValue]) -> Result<ColumnarValue> {
)));
}
- let size = match args[0] {
- ColumnarValue::Array(ref a) => a.len(),
- ColumnarValue::Scalar(ref _s) => 1,
- };
- let mut res = new_null_array(&args[0].data_type(), size);
+ let return_type = args[0].data_type();
+ let mut return_array = args.iter().filter_map(|x| match x {
+ ColumnarValue::Array(array) => Some(array.len()),
+ _ => None,
+ });
+
+ if let Some(size) = return_array.next() {
+ // start with nulls as default output
+ let mut current_value = new_null_array(&return_type, size);
+ let mut remainder = BooleanArray::from(vec![true; size]);
- for column_value in args {
- for i in 0..size {
- match column_value {
- ColumnarValue::Array(array_ref) => {
- let curr_null_mask = compute::is_null(res.as_ref())?;
- let arr_not_null_mask = compute::is_not_null(array_ref)?;
- let bool_mask = compute::and(&curr_null_mask, &arr_not_null_mask)?;
- res = zip(&bool_mask, array_ref, &res)?;
+ for arg in args {
+ match arg {
+ ColumnarValue::Array(ref array) => {
+ let to_apply = and(&remainder, &is_not_null(array.as_ref())?)?;
+ current_value = zip(&to_apply, array, current_value.as_ref())?;
+ remainder = and(&remainder, &is_null(array)?)?;
}
- ColumnarValue::Scalar(scalar) => {
- if !scalar.is_null() && res.is_null(i) {
- let vec: Vec<bool> =
- (0..size).into_iter().map(|j| j == i).collect();
- let bool_arr = BooleanArray::from(vec);
- res =
- zip(&bool_arr, scalar.to_array_of_size(size).as_ref(), &res)?;
+ ColumnarValue::Scalar(value) => {
+ if value.is_null() {
continue;
+ } else {
+ let last_value = value.to_array_of_size(size);
+ current_value =
+ zip(&remainder, &last_value, current_value.as_ref())?;
+ break;
}
}
}
+ if remainder.iter().all(|x| x == Some(false)) {
+ break;
+ }
}
+ Ok(ColumnarValue::Array(current_value))
+ } else {
+ let result = args
+ .iter()
+ .filter_map(|x| match x {
+ ColumnarValue::Scalar(s) if !s.is_null() => Some(x.clone()),
+ _ => None,
+ })
+ .next()
+ .unwrap_or_else(|| args[0].clone());
+ Ok(result)
}
-
- Ok(ColumnarValue::Array(Arc::new(res)))
}
/// Currently supported types by the coalesce function.
diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs
index 02f1d3c38..df763ec9a 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -19,7 +19,8 @@ use std::{any::Any, sync::Arc};
use crate::expressions::try_cast;
use crate::PhysicalExpr;
-use arrow::array::{self, *};
+use arrow::array::*;
+use arrow::compute::kernels::zip::zip;
use arrow::compute::{and, eq_dyn, is_null, not, or, or_kleene};
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
@@ -107,144 +108,6 @@ impl CaseExpr {
}
}
-macro_rules! if_then_else {
- ($BUILDER_TYPE:ty, $ARRAY_TYPE:ty, $BOOLS:expr, $TRUE:expr, $FALSE:expr) => {{
- let true_values = $TRUE
- .as_ref()
- .as_any()
- .downcast_ref::<$ARRAY_TYPE>()
- .expect("true_values downcast failed");
-
- let false_values = $FALSE
- .as_ref()
- .as_any()
- .downcast_ref::<$ARRAY_TYPE>()
- .expect("false_values downcast failed");
-
- let mut builder = <$BUILDER_TYPE>::new($BOOLS.len());
- for i in 0..$BOOLS.len() {
- if $BOOLS.is_null(i) {
- if false_values.is_null(i) {
- builder.append_null()?;
- } else {
- builder.append_value(false_values.value(i))?;
- }
- } else if $BOOLS.value(i) {
- if true_values.is_null(i) {
- builder.append_null()?;
- } else {
- builder.append_value(true_values.value(i))?;
- }
- } else {
- if false_values.is_null(i) {
- builder.append_null()?;
- } else {
- builder.append_value(false_values.value(i))?;
- }
- }
- }
- Ok(Arc::new(builder.finish()))
- }};
-}
-
-fn if_then_else(
- bools: &BooleanArray,
- true_values: ArrayRef,
- false_values: ArrayRef,
- data_type: &DataType,
-) -> Result<ArrayRef> {
- match data_type {
- DataType::UInt8 => if_then_else!(
- array::UInt8Builder,
- array::UInt8Array,
- bools,
- true_values,
- false_values
- ),
- DataType::UInt16 => if_then_else!(
- array::UInt16Builder,
- array::UInt16Array,
- bools,
- true_values,
- false_values
- ),
- DataType::UInt32 => if_then_else!(
- array::UInt32Builder,
- array::UInt32Array,
- bools,
- true_values,
- false_values
- ),
- DataType::UInt64 => if_then_else!(
- array::UInt64Builder,
- array::UInt64Array,
- bools,
- true_values,
- false_values
- ),
- DataType::Int8 => if_then_else!(
- array::Int8Builder,
- array::Int8Array,
- bools,
- true_values,
- false_values
- ),
- DataType::Int16 => if_then_else!(
- array::Int16Builder,
- array::Int16Array,
- bools,
- true_values,
- false_values
- ),
- DataType::Int32 => if_then_else!(
- array::Int32Builder,
- array::Int32Array,
- bools,
- true_values,
- false_values
- ),
- DataType::Int64 => if_then_else!(
- array::Int64Builder,
- array::Int64Array,
- bools,
- true_values,
- false_values
- ),
- DataType::Float32 => if_then_else!(
- array::Float32Builder,
- array::Float32Array,
- bools,
- true_values,
- false_values
- ),
- DataType::Float64 => if_then_else!(
- array::Float64Builder,
- array::Float64Array,
- bools,
- true_values,
- false_values
- ),
- DataType::Utf8 => if_then_else!(
- array::StringBuilder,
- array::StringArray,
- bools,
- true_values,
- false_values
- ),
- DataType::Boolean => if_then_else!(
- array::BooleanBuilder,
- array::BooleanArray,
- bools,
- true_values,
- false_values
- ),
- other => Err(DataFusionError::Execution(format!(
- "CASE does not support '{:?}'",
- other
- ))),
- }
-}
-
impl CaseExpr {
/// This function evaluates the form of CASE that matches an expression to fixed values.
///
@@ -278,7 +141,7 @@ impl CaseExpr {
let then_value = then_value.into_array(batch.num_rows());
current_value =
- if_then_else(&when_match, then_value, current_value, &return_type)?;
+ zip(&when_match, then_value.as_ref(), current_value.as_ref())?;
remainder = and(&remainder, &or_kleene(¬(&when_match)?, &base_nulls)?)?;
}
@@ -292,7 +155,7 @@ impl CaseExpr {
let else_ = expr
.evaluate_selection(batch, &remainder)?
.into_array(batch.num_rows());
- current_value = if_then_else(&remainder, else_, current_value, &return_type)?;
+ current_value = zip(&remainder, else_.as_ref(), current_value.as_ref())?;
}
Ok(ColumnarValue::Array(current_value))
@@ -327,8 +190,7 @@ impl CaseExpr {
.evaluate_selection(batch, when_value)?;
let then_value = then_value.into_array(batch.num_rows());
- current_value =
- if_then_else(when_value, then_value, current_value, &return_type)?;
+ current_value = zip(when_value, then_value.as_ref(), current_value.as_ref())?;
// Succeed tuples should be filtered out for short-circuit evaluation,
// null values for the current when expr should be kept
@@ -345,7 +207,7 @@ impl CaseExpr {
let else_ = expr
.evaluate_selection(batch, &remainder)?
.into_array(batch.num_rows());
- current_value = if_then_else(&remainder, else_, current_value, &return_type)?;
+ current_value = zip(&remainder, else_.as_ref(), current_value.as_ref())?;
}
Ok(ColumnarValue::Array(current_value))