You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/08/04 16:33:45 UTC
[arrow-datafusion] branch master updated: Split binary_expr.rs into smaller modules (#3026)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d07a775c5 Split binary_expr.rs into smaller modules (#3026)
d07a775c5 is described below
commit d07a775c55091195a3f615db61e49b1ef97ce5ba
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Aug 4 12:33:40 2022 -0400
Split binary_expr.rs into smaller modules (#3026)
---
datafusion/physical-expr/src/expressions/binary.rs | 791 +--------------------
.../src/expressions/binary/adapter.rs | 62 ++
.../src/expressions/binary/kernels.rs | 191 +++++
.../src/expressions/binary/kernels_arrow.rs | 647 +++++++++++++++++
4 files changed, 921 insertions(+), 770 deletions(-)
diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index b7492445b..004628a80 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -15,10 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+mod adapter;
+mod kernels;
+mod kernels_arrow;
+
use std::convert::TryInto;
use std::{any::Any, sync::Arc};
-use arrow::array::TimestampMillisecondArray;
use arrow::array::*;
use arrow::compute::kernels::arithmetic::{
add, add_scalar, divide, divide_scalar, modulus, modulus_scalar, multiply,
@@ -44,8 +47,23 @@ use arrow::compute::kernels::comparison::{like_utf8, nlike_utf8, regexp_is_match
use arrow::compute::kernels::comparison::{
like_utf8_scalar, nlike_utf8_scalar, regexp_is_match_utf8_scalar,
};
-use arrow::datatypes::{ArrowNumericType, DataType, Schema, TimeUnit};
-use arrow::error::ArrowError::DivideByZero;
+
+use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
+use kernels::{
+ bitwise_and, bitwise_and_scalar, bitwise_or, bitwise_or_scalar, string_concat,
+};
+use kernels_arrow::{
+ add_decimal, add_decimal_scalar, divide_decimal, divide_decimal_scalar,
+ eq_decimal_scalar, gt_decimal_scalar, gt_eq_decimal_scalar, is_distinct_from,
+ is_distinct_from_bool, is_distinct_from_decimal, is_distinct_from_null,
+ is_distinct_from_utf8, is_not_distinct_from, is_not_distinct_from_bool,
+ is_not_distinct_from_decimal, is_not_distinct_from_null, is_not_distinct_from_utf8,
+ lt_decimal_scalar, lt_eq_decimal_scalar, modulus_decimal, modulus_decimal_scalar,
+ multiply_decimal, multiply_decimal_scalar, neq_decimal_scalar, subtract_decimal,
+ subtract_decimal_scalar,
+};
+
+use arrow::datatypes::{DataType, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;
use crate::expressions::try_cast;
@@ -55,513 +73,6 @@ use datafusion_common::{DataFusionError, Result};
use datafusion_expr::binary_rule::binary_operator_data_type;
use datafusion_expr::{binary_rule::coerce_types, ColumnarValue, Operator};
-/// create a `dyn_op` wrapper function for the specified operation
-/// that call the underlying dyn_op arrow kernel if the type is
-/// supported, and translates ArrowError to DataFusionError
-macro_rules! make_dyn_comp_op {
- ($OP:tt) => {
- paste::paste! {
- /// wrapper over arrow compute kernel that maps Error types and
- /// patches missing support in arrow
- fn [<$OP _dyn>] (left: &dyn Array, right: &dyn Array) -> Result<ArrayRef> {
- match (left.data_type(), right.data_type()) {
- // Call `op_decimal` (e.g. `eq_decimal) until
- // arrow has native support
- // https://github.com/apache/arrow-rs/issues/1200
- (DataType::Decimal(_, _), DataType::Decimal(_, _)) => {
- [<$OP _decimal>](as_decimal_array(left), as_decimal_array(right))
- },
- // By default call the arrow kernel
- _ => {
- arrow::compute::kernels::comparison::[<$OP _dyn>](left, right)
- .map_err(|e| e.into())
- }
- }
- .map(|a| Arc::new(a) as ArrayRef)
- }
- }
- };
-}
-
-// create eq_dyn, gt_dyn, wrappers etc
-make_dyn_comp_op!(eq);
-make_dyn_comp_op!(gt);
-make_dyn_comp_op!(gt_eq);
-make_dyn_comp_op!(lt);
-make_dyn_comp_op!(lt_eq);
-make_dyn_comp_op!(neq);
-
-// Simple (low performance) kernels until optimized kernels are added to arrow
-// See https://github.com/apache/arrow-rs/issues/960
-
-fn is_distinct_from_bool(
- left: &BooleanArray,
- right: &BooleanArray,
-) -> Result<BooleanArray> {
- // Different from `neq_bool` because `null is distinct from null` is false and not null
- Ok(left
- .iter()
- .zip(right.iter())
- .map(|(left, right)| Some(left != right))
- .collect())
-}
-
-fn is_not_distinct_from_bool(
- left: &BooleanArray,
- right: &BooleanArray,
-) -> Result<BooleanArray> {
- Ok(left
- .iter()
- .zip(right.iter())
- .map(|(left, right)| Some(left == right))
- .collect())
-}
-
-// TODO move decimal kernels to to arrow-rs
-// https://github.com/apache/arrow-rs/issues/1200
-
-/// Creates an BooleanArray the same size as `left`,
-/// applying `op` to all non-null elements of left
-fn compare_decimal_scalar<F>(
- left: &Decimal128Array,
- right: i128,
- op: F,
-) -> Result<BooleanArray>
-where
- F: Fn(i128, i128) -> bool,
-{
- Ok(left
- .iter()
- .map(|left| left.map(|left| op(left, right)))
- .collect())
-}
-
-/// Creates an BooleanArray the same size as `left`,
-/// by applying `op` to all non-null elements of left and right
-fn compare_decimal<F>(
- left: &Decimal128Array,
- right: &Decimal128Array,
- op: F,
-) -> Result<BooleanArray>
-where
- F: Fn(i128, i128) -> bool,
-{
- Ok(left
- .iter()
- .zip(right.iter())
- .map(|(left, right)| {
- if let (Some(left), Some(right)) = (left, right) {
- Some(op(left, right))
- } else {
- None
- }
- })
- .collect())
-}
-
-pub(super) fn eq_decimal_scalar(
- left: &Decimal128Array,
- right: i128,
-) -> Result<BooleanArray> {
- compare_decimal_scalar(left, right, |left, right| left == right)
-}
-
-pub(super) fn eq_decimal(
- left: &Decimal128Array,
- right: &Decimal128Array,
-) -> Result<BooleanArray> {
- compare_decimal(left, right, |left, right| left == right)
-}
-
-fn neq_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<BooleanArray> {
- compare_decimal_scalar(left, right, |left, right| left != right)
-}
-
-fn neq_decimal(left: &Decimal128Array, right: &Decimal128Array) -> Result<BooleanArray> {
- compare_decimal(left, right, |left, right| left != right)
-}
-
-fn lt_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<BooleanArray> {
- compare_decimal_scalar(left, right, |left, right| left < right)
-}
-
-fn lt_decimal(left: &Decimal128Array, right: &Decimal128Array) -> Result<BooleanArray> {
- compare_decimal(left, right, |left, right| left < right)
-}
-
-fn lt_eq_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<BooleanArray> {
- compare_decimal_scalar(left, right, |left, right| left <= right)
-}
-
-fn lt_eq_decimal(
- left: &Decimal128Array,
- right: &Decimal128Array,
-) -> Result<BooleanArray> {
- compare_decimal(left, right, |left, right| left <= right)
-}
-
-fn gt_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<BooleanArray> {
- compare_decimal_scalar(left, right, |left, right| left > right)
-}
-
-fn gt_decimal(left: &Decimal128Array, right: &Decimal128Array) -> Result<BooleanArray> {
- compare_decimal(left, right, |left, right| left > right)
-}
-
-fn gt_eq_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<BooleanArray> {
- compare_decimal_scalar(left, right, |left, right| left >= right)
-}
-
-fn gt_eq_decimal(
- left: &Decimal128Array,
- right: &Decimal128Array,
-) -> Result<BooleanArray> {
- compare_decimal(left, right, |left, right| left >= right)
-}
-
-fn is_distinct_from_decimal(
- left: &Decimal128Array,
- right: &Decimal128Array,
-) -> Result<BooleanArray> {
- Ok(left
- .iter()
- .zip(right.iter())
- .map(|(left, right)| match (left, right) {
- (None, None) => Some(false),
- (None, Some(_)) | (Some(_), None) => Some(true),
- (Some(left), Some(right)) => Some(left != right),
- })
- .collect())
-}
-
-fn is_not_distinct_from_decimal(
- left: &Decimal128Array,
- right: &Decimal128Array,
-) -> Result<BooleanArray> {
- Ok(left
- .iter()
- .zip(right.iter())
- .map(|(left, right)| match (left, right) {
- (None, None) => Some(true),
- (None, Some(_)) | (Some(_), None) => Some(false),
- (Some(left), Some(right)) => Some(left == right),
- })
- .collect())
-}
-
-/// Creates an Decimal128Array the same size as `left`,
-/// by applying `op` to all non-null elements of left and right
-fn arith_decimal<F>(
- left: &Decimal128Array,
- right: &Decimal128Array,
- op: F,
-) -> Result<Decimal128Array>
-where
- F: Fn(i128, i128) -> Result<i128>,
-{
- left.iter()
- .zip(right.iter())
- .map(|(left, right)| {
- if let (Some(left), Some(right)) = (left, right) {
- Some(op(left, right)).transpose()
- } else {
- Ok(None)
- }
- })
- .collect()
-}
-
-fn arith_decimal_scalar<F>(
- left: &Decimal128Array,
- right: i128,
- op: F,
-) -> Result<Decimal128Array>
-where
- F: Fn(i128, i128) -> Result<i128>,
-{
- left.iter()
- .map(|left| {
- if let Some(left) = left {
- Some(op(left, right)).transpose()
- } else {
- Ok(None)
- }
- })
- .collect()
-}
-
-fn add_decimal(
- left: &Decimal128Array,
- right: &Decimal128Array,
-) -> Result<Decimal128Array> {
- let array = arith_decimal(left, right, |left, right| Ok(left + right))?
- .with_precision_and_scale(left.precision(), left.scale())?;
- Ok(array)
-}
-
-fn add_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<Decimal128Array> {
- let array = arith_decimal_scalar(left, right, |left, right| Ok(left + right))?
- .with_precision_and_scale(left.precision(), left.scale())?;
- Ok(array)
-}
-
-fn subtract_decimal(
- left: &Decimal128Array,
- right: &Decimal128Array,
-) -> Result<Decimal128Array> {
- let array = arith_decimal(left, right, |left, right| Ok(left - right))?
- .with_precision_and_scale(left.precision(), left.scale())?;
- Ok(array)
-}
-
-fn subtract_decimal_scalar(
- left: &Decimal128Array,
- right: i128,
-) -> Result<Decimal128Array> {
- let array = arith_decimal_scalar(left, right, |left, right| Ok(left - right))?
- .with_precision_and_scale(left.precision(), left.scale())?;
- Ok(array)
-}
-
-fn multiply_decimal(
- left: &Decimal128Array,
- right: &Decimal128Array,
-) -> Result<Decimal128Array> {
- let divide = 10_i128.pow(left.scale() as u32);
- let array = arith_decimal(left, right, |left, right| Ok(left * right / divide))?
- .with_precision_and_scale(left.precision(), left.scale())?;
- Ok(array)
-}
-
-fn multiply_decimal_scalar(
- left: &Decimal128Array,
- right: i128,
-) -> Result<Decimal128Array> {
- let divide = 10_i128.pow(left.scale() as u32);
- let array =
- arith_decimal_scalar(left, right, |left, right| Ok(left * right / divide))?
- .with_precision_and_scale(left.precision(), left.scale())?;
- Ok(array)
-}
-
-fn divide_decimal(
- left: &Decimal128Array,
- right: &Decimal128Array,
-) -> Result<Decimal128Array> {
- let mul = 10_f64.powi(left.scale() as i32);
- let array = arith_decimal(left, right, |left, right| {
- let l_value = left as f64;
- let r_value = right as f64;
- let result = ((l_value / r_value) * mul) as i128;
- Ok(result)
- })?
- .with_precision_and_scale(left.precision(), left.scale())?;
- Ok(array)
-}
-
-fn divide_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<Decimal128Array> {
- let mul = 10_f64.powi(left.scale() as i32);
- let array = arith_decimal_scalar(left, right, |left, right| {
- let l_value = left as f64;
- let r_value = right as f64;
- let result = ((l_value / r_value) * mul) as i128;
- Ok(result)
- })?
- .with_precision_and_scale(left.precision(), left.scale())?;
- Ok(array)
-}
-
-fn modulus_decimal(
- left: &Decimal128Array,
- right: &Decimal128Array,
-) -> Result<Decimal128Array> {
- let array = arith_decimal(left, right, |left, right| {
- if right == 0 {
- Err(DataFusionError::ArrowError(DivideByZero))
- } else {
- Ok(left % right)
- }
- })?
- .with_precision_and_scale(left.precision(), left.scale())?;
- Ok(array)
-}
-
-fn modulus_decimal_scalar(
- left: &Decimal128Array,
- right: i128,
-) -> Result<Decimal128Array> {
- if right == 0 {
- return Err(DataFusionError::ArrowError(DivideByZero));
- }
- let array = arith_decimal_scalar(left, right, |left, right| Ok(left % right))?
- .with_precision_and_scale(left.precision(), left.scale())?;
- Ok(array)
-}
-
-/// The binary_bitwise_array_op macro only evaluates for integer types
-/// like int64, int32.
-/// It is used to do bitwise operation.
-macro_rules! binary_bitwise_array_op {
- ($LEFT:expr, $RIGHT:expr, $OP:tt, $ARRAY_TYPE:ident, $TYPE:ty) => {{
- let len = $LEFT.len();
- let left = $LEFT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
- let right = $RIGHT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
- let result = (0..len)
- .into_iter()
- .map(|i| {
- if left.is_null(i) || right.is_null(i) {
- None
- } else {
- Some(left.value(i) $OP right.value(i))
- }
- })
- .collect::<$ARRAY_TYPE>();
- Ok(Arc::new(result))
- }};
-}
-
-/// The binary_bitwise_array_op macro only evaluates for integer types
-/// like int64, int32.
-/// It is used to do bitwise operation on an array with a scalar.
-macro_rules! binary_bitwise_array_scalar {
- ($LEFT:expr, $RIGHT:expr, $OP:tt, $ARRAY_TYPE:ident, $TYPE:ty) => {{
- let len = $LEFT.len();
- let array = $LEFT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
- let scalar = $RIGHT;
- if scalar.is_null() {
- Ok(new_null_array(array.data_type(), len))
- } else {
- let right: $TYPE = scalar.try_into().unwrap();
- let result = (0..len)
- .into_iter()
- .map(|i| {
- if array.is_null(i) {
- None
- } else {
- Some(array.value(i) $OP right)
- }
- })
- .collect::<$ARRAY_TYPE>();
- Ok(Arc::new(result) as ArrayRef)
- }
- }};
-}
-
-fn bitwise_and(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
- match &left.data_type() {
- DataType::Int8 => {
- binary_bitwise_array_op!(left, right, &, Int8Array, i8)
- }
- DataType::Int16 => {
- binary_bitwise_array_op!(left, right, &, Int16Array, i16)
- }
- DataType::Int32 => {
- binary_bitwise_array_op!(left, right, &, Int32Array, i32)
- }
- DataType::Int64 => {
- binary_bitwise_array_op!(left, right, &, Int64Array, i64)
- }
- other => Err(DataFusionError::Internal(format!(
- "Data type {:?} not supported for binary operation '{}' on dyn arrays",
- other,
- Operator::BitwiseAnd
- ))),
- }
-}
-
-fn bitwise_or(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
- match &left.data_type() {
- DataType::Int8 => {
- binary_bitwise_array_op!(left, right, |, Int8Array, i8)
- }
- DataType::Int16 => {
- binary_bitwise_array_op!(left, right, |, Int16Array, i16)
- }
- DataType::Int32 => {
- binary_bitwise_array_op!(left, right, |, Int32Array, i32)
- }
- DataType::Int64 => {
- binary_bitwise_array_op!(left, right, |, Int64Array, i64)
- }
- other => Err(DataFusionError::Internal(format!(
- "Data type {:?} not supported for binary operation '{}' on dyn arrays",
- other,
- Operator::BitwiseOr
- ))),
- }
-}
-
-/// Concat lhs and rhs String Array, any `NULL` exists on lhs or rhs will come out result `NULL`
-/// 1. 'a' || 'b' || 32 = 'ab32'
-/// 2. 'a' || NULL = NULL
-fn string_concat(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
- let left_array = left.as_any().downcast_ref::<StringArray>().unwrap();
- let right_array = right.as_any().downcast_ref::<StringArray>().unwrap();
- let result = (0..left.len())
- .into_iter()
- .map(|i| {
- if left.is_null(i) || right.is_null(i) {
- None
- } else {
- let mut owned_string: String = "".to_owned();
- owned_string.push_str(left_array.value(i));
- owned_string.push_str(right_array.value(i));
- Some(owned_string)
- }
- })
- .collect::<StringArray>();
- Ok(Arc::new(result) as ArrayRef)
-}
-
-fn bitwise_and_scalar(
- array: &dyn Array,
- scalar: ScalarValue,
-) -> Option<Result<ArrayRef>> {
- let result = match array.data_type() {
- DataType::Int8 => {
- binary_bitwise_array_scalar!(array, scalar, &, Int8Array, i8)
- }
- DataType::Int16 => {
- binary_bitwise_array_scalar!(array, scalar, &, Int16Array, i16)
- }
- DataType::Int32 => {
- binary_bitwise_array_scalar!(array, scalar, &, Int32Array, i32)
- }
- DataType::Int64 => {
- binary_bitwise_array_scalar!(array, scalar, &, Int64Array, i64)
- }
- other => Err(DataFusionError::Internal(format!(
- "Data type {:?} not supported for binary operation '{}' on dyn arrays",
- other,
- Operator::BitwiseAnd
- ))),
- };
- Some(result)
-}
-
-fn bitwise_or_scalar(array: &dyn Array, scalar: ScalarValue) -> Option<Result<ArrayRef>> {
- let result = match array.data_type() {
- DataType::Int8 => {
- binary_bitwise_array_scalar!(array, scalar, |, Int8Array, i8)
- }
- DataType::Int16 => {
- binary_bitwise_array_scalar!(array, scalar, |, Int16Array, i16)
- }
- DataType::Int32 => {
- binary_bitwise_array_scalar!(array, scalar, |, Int32Array, i32)
- }
- DataType::Int64 => {
- binary_bitwise_array_scalar!(array, scalar, |, Int64Array, i64)
- }
- other => Err(DataFusionError::Internal(format!(
- "Data type {:?} not supported for binary operation '{}' on dyn arrays",
- other,
- Operator::BitwiseOr
- ))),
- };
- Some(result)
-}
-
/// Binary expression
#[derive(Debug)]
pub struct BinaryExpr {
@@ -1345,73 +856,6 @@ impl BinaryExpr {
}
}
-fn is_distinct_from<T>(
- left: &PrimitiveArray<T>,
- right: &PrimitiveArray<T>,
-) -> Result<BooleanArray>
-where
- T: ArrowNumericType,
-{
- Ok(left
- .iter()
- .zip(right.iter())
- .map(|(x, y)| Some(x != y))
- .collect())
-}
-
-fn is_distinct_from_utf8<OffsetSize: OffsetSizeTrait>(
- left: &GenericStringArray<OffsetSize>,
- right: &GenericStringArray<OffsetSize>,
-) -> Result<BooleanArray> {
- Ok(left
- .iter()
- .zip(right.iter())
- .map(|(x, y)| Some(x != y))
- .collect())
-}
-
-fn is_distinct_from_null(left: &NullArray, _right: &NullArray) -> Result<BooleanArray> {
- let length = left.len();
- make_boolean_array(length, false)
-}
-
-fn is_not_distinct_from_null(
- left: &NullArray,
- _right: &NullArray,
-) -> Result<BooleanArray> {
- let length = left.len();
- make_boolean_array(length, true)
-}
-
-fn make_boolean_array(length: usize, value: bool) -> Result<BooleanArray> {
- Ok((0..length).into_iter().map(|_| Some(value)).collect())
-}
-
-fn is_not_distinct_from<T>(
- left: &PrimitiveArray<T>,
- right: &PrimitiveArray<T>,
-) -> Result<BooleanArray>
-where
- T: ArrowNumericType,
-{
- Ok(left
- .iter()
- .zip(right.iter())
- .map(|(x, y)| Some(x == y))
- .collect())
-}
-
-fn is_not_distinct_from_utf8<OffsetSize: OffsetSizeTrait>(
- left: &GenericStringArray<OffsetSize>,
- right: &GenericStringArray<OffsetSize>,
-) -> Result<BooleanArray> {
- Ok(left
- .iter()
- .zip(right.iter())
- .map(|(x, y)| Some(x == y))
- .collect())
-}
-
/// return two physical expressions that are optionally coerced to a
/// common type that the binary operator supports.
fn binary_cast(
@@ -2674,119 +2118,6 @@ mod tests {
decimal_builder.finish()
}
- #[test]
- fn comparison_decimal_op_test() -> Result<()> {
- let value_i128: i128 = 123;
- let decimal_array = create_decimal_array(
- &[
- Some(value_i128),
- None,
- Some(value_i128 - 1),
- Some(value_i128 + 1),
- ],
- 25,
- 3,
- );
- // eq: array = i128
- let result = eq_decimal_scalar(&decimal_array, value_i128)?;
- assert_eq!(
- BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
- result
- );
- // neq: array != i128
- let result = neq_decimal_scalar(&decimal_array, value_i128)?;
- assert_eq!(
- BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
- result
- );
- // lt: array < i128
- let result = lt_decimal_scalar(&decimal_array, value_i128)?;
- assert_eq!(
- BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
- result
- );
- // lt_eq: array <= i128
- let result = lt_eq_decimal_scalar(&decimal_array, value_i128)?;
- assert_eq!(
- BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
- result
- );
- // gt: array > i128
- let result = gt_decimal_scalar(&decimal_array, value_i128)?;
- assert_eq!(
- BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
- result
- );
- // gt_eq: array >= i128
- let result = gt_eq_decimal_scalar(&decimal_array, value_i128)?;
- assert_eq!(
- BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
- result
- );
-
- let left_decimal_array = decimal_array;
- let right_decimal_array = create_decimal_array(
- &[
- Some(value_i128 - 1),
- Some(value_i128),
- Some(value_i128 + 1),
- Some(value_i128 + 1),
- ],
- 25,
- 3,
- );
- // eq: left == right
- let result = eq_decimal(&left_decimal_array, &right_decimal_array)?;
- assert_eq!(
- BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
- result
- );
- // neq: left != right
- let result = neq_decimal(&left_decimal_array, &right_decimal_array)?;
- assert_eq!(
- BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
- result
- );
- // lt: left < right
- let result = lt_decimal(&left_decimal_array, &right_decimal_array)?;
- assert_eq!(
- BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
- result
- );
- // lt_eq: left <= right
- let result = lt_eq_decimal(&left_decimal_array, &right_decimal_array)?;
- assert_eq!(
- BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
- result
- );
- // gt: left > right
- let result = gt_decimal(&left_decimal_array, &right_decimal_array)?;
- assert_eq!(
- BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
- result
- );
- // gt_eq: left >= right
- let result = gt_eq_decimal(&left_decimal_array, &right_decimal_array)?;
- assert_eq!(
- BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
- result
- );
- // is_distinct: left distinct right
- let result = is_distinct_from_decimal(&left_decimal_array, &right_decimal_array)?;
- assert_eq!(
- BooleanArray::from(vec![Some(true), Some(true), Some(true), Some(false)]),
- result
- );
- // is_distinct: left distinct right
- let result =
- is_not_distinct_from_decimal(&left_decimal_array, &right_decimal_array)?;
- assert_eq!(
- BooleanArray::from(vec![Some(false), Some(false), Some(false), Some(true)]),
- result
- );
- Ok(())
- }
-
#[test]
fn comparison_decimal_expr_test() -> Result<()> {
let decimal_scalar = ScalarValue::Decimal128(Some(123_456), 10, 3);
@@ -3000,86 +2331,6 @@ mod tests {
Ok(())
}
- #[test]
- fn arithmetic_decimal_op_test() -> Result<()> {
- let value_i128: i128 = 123;
- let left_decimal_array = create_decimal_array(
- &[
- Some(value_i128),
- None,
- Some(value_i128 - 1),
- Some(value_i128 + 1),
- ],
- 25,
- 3,
- );
- let right_decimal_array = create_decimal_array(
- &[
- Some(value_i128),
- Some(value_i128),
- Some(value_i128),
- Some(value_i128),
- ],
- 25,
- 3,
- );
- // add
- let result = add_decimal(&left_decimal_array, &right_decimal_array)?;
- let expect =
- create_decimal_array(&[Some(246), None, Some(245), Some(247)], 25, 3);
- assert_eq!(expect, result);
- let result = add_decimal_scalar(&left_decimal_array, 10)?;
- let expect =
- create_decimal_array(&[Some(133), None, Some(132), Some(134)], 25, 3);
- assert_eq!(expect, result);
- // subtract
- let result = subtract_decimal(&left_decimal_array, &right_decimal_array)?;
- let expect = create_decimal_array(&[Some(0), None, Some(-1), Some(1)], 25, 3);
- assert_eq!(expect, result);
- let result = subtract_decimal_scalar(&left_decimal_array, 10)?;
- let expect =
- create_decimal_array(&[Some(113), None, Some(112), Some(114)], 25, 3);
- assert_eq!(expect, result);
- // multiply
- let result = multiply_decimal(&left_decimal_array, &right_decimal_array)?;
- let expect = create_decimal_array(&[Some(15), None, Some(15), Some(15)], 25, 3);
- assert_eq!(expect, result);
- let result = multiply_decimal_scalar(&left_decimal_array, 10)?;
- let expect = create_decimal_array(&[Some(1), None, Some(1), Some(1)], 25, 3);
- assert_eq!(expect, result);
- // divide
- let left_decimal_array = create_decimal_array(
- &[Some(1234567), None, Some(1234567), Some(1234567)],
- 25,
- 3,
- );
- let right_decimal_array =
- create_decimal_array(&[Some(10), Some(100), Some(55), Some(-123)], 25, 3);
- let result = divide_decimal(&left_decimal_array, &right_decimal_array)?;
- let expect = create_decimal_array(
- &[Some(123456700), None, Some(22446672), Some(-10037130)],
- 25,
- 3,
- );
- assert_eq!(expect, result);
- let result = divide_decimal_scalar(&left_decimal_array, 10)?;
- let expect = create_decimal_array(
- &[Some(123456700), None, Some(123456700), Some(123456700)],
- 25,
- 3,
- );
- assert_eq!(expect, result);
- // modulus
- let result = modulus_decimal(&left_decimal_array, &right_decimal_array)?;
- let expect = create_decimal_array(&[Some(7), None, Some(37), Some(16)], 25, 3);
- assert_eq!(expect, result);
- let result = modulus_decimal_scalar(&left_decimal_array, 10)?;
- let expect = create_decimal_array(&[Some(7), None, Some(7), Some(7)], 25, 3);
- assert_eq!(expect, result);
-
- Ok(())
- }
-
fn apply_arithmetic_op(
schema: &SchemaRef,
left: &ArrayRef,
diff --git a/datafusion/physical-expr/src/expressions/binary/adapter.rs b/datafusion/physical-expr/src/expressions/binary/adapter.rs
new file mode 100644
index 000000000..b0293cdf0
--- /dev/null
+++ b/datafusion/physical-expr/src/expressions/binary/adapter.rs
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains functions that change types or names of other
+//! kernels to make them compatible with the main dispatch logic
+
+use std::sync::Arc;
+
+use super::kernels_arrow::*;
+use arrow::array::*;
+use arrow::datatypes::DataType;
+use datafusion_common::Result;
+
+/// create a `dyn_op` wrapper function for the specified operation
+/// that call the underlying dyn_op arrow kernel if the type is
+/// supported, and translates ArrowError to DataFusionError
+macro_rules! make_dyn_comp_op {
+ ($OP:tt) => {
+ paste::paste! {
+ /// wrapper over arrow compute kernel that maps Error types and
+ /// patches missing support in arrow
+ pub(crate) fn [<$OP _dyn>] (left: &dyn Array, right: &dyn Array) -> Result<ArrayRef> {
+ match (left.data_type(), right.data_type()) {
+ // Call `op_decimal` (e.g. `eq_decimal) until
+ // arrow has native support
+ // https://github.com/apache/arrow-rs/issues/1200
+ (DataType::Decimal(_, _), DataType::Decimal(_, _)) => {
+ [<$OP _decimal>](as_decimal_array(left), as_decimal_array(right))
+ },
+ // By default call the arrow kernel
+ _ => {
+ arrow::compute::kernels::comparison::[<$OP _dyn>](left, right)
+ .map_err(|e| e.into())
+ }
+ }
+ .map(|a| Arc::new(a) as ArrayRef)
+ }
+ }
+ };
+}
+
+// create eq_dyn, gt_dyn, wrappers etc
+make_dyn_comp_op!(eq);
+make_dyn_comp_op!(gt);
+make_dyn_comp_op!(gt_eq);
+make_dyn_comp_op!(lt);
+make_dyn_comp_op!(lt_eq);
+make_dyn_comp_op!(neq);
diff --git a/datafusion/physical-expr/src/expressions/binary/kernels.rs b/datafusion/physical-expr/src/expressions/binary/kernels.rs
new file mode 100644
index 000000000..e234815af
--- /dev/null
+++ b/datafusion/physical-expr/src/expressions/binary/kernels.rs
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains computation kernels that are specific to
+//! datafusion and not (yet) targeted to port upstream to arrow
+use arrow::array::*;
+use arrow::datatypes::DataType;
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Operator;
+use std::sync::Arc;
+
+/// The binary_bitwise_array_op macro only evaluates for integer types
+/// like int64, int32.
+/// It is used to do bitwise operation.
+macro_rules! binary_bitwise_array_op {
+ ($LEFT:expr, $RIGHT:expr, $OP:tt, $ARRAY_TYPE:ident, $TYPE:ty) => {{
+ let len = $LEFT.len();
+ let left = $LEFT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
+ let right = $RIGHT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
+ let result = (0..len)
+ .into_iter()
+ .map(|i| {
+ if left.is_null(i) || right.is_null(i) {
+ None
+ } else {
+ Some(left.value(i) $OP right.value(i))
+ }
+ })
+ .collect::<$ARRAY_TYPE>();
+ Ok(Arc::new(result))
+ }};
+}
+
+/// The binary_bitwise_array_op macro only evaluates for integer types
+/// like int64, int32.
+/// It is used to do bitwise operation on an array with a scalar.
+macro_rules! binary_bitwise_array_scalar {
+ ($LEFT:expr, $RIGHT:expr, $OP:tt, $ARRAY_TYPE:ident, $TYPE:ty) => {{
+ let len = $LEFT.len();
+ let array = $LEFT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
+ let scalar = $RIGHT;
+ if scalar.is_null() {
+ Ok(new_null_array(array.data_type(), len))
+ } else {
+ let right: $TYPE = scalar.try_into().unwrap();
+ let result = (0..len)
+ .into_iter()
+ .map(|i| {
+ if array.is_null(i) {
+ None
+ } else {
+ Some(array.value(i) $OP right)
+ }
+ })
+ .collect::<$ARRAY_TYPE>();
+ Ok(Arc::new(result) as ArrayRef)
+ }
+ }};
+}
+
+pub(crate) fn bitwise_and(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
+ match &left.data_type() {
+ DataType::Int8 => {
+ binary_bitwise_array_op!(left, right, &, Int8Array, i8)
+ }
+ DataType::Int16 => {
+ binary_bitwise_array_op!(left, right, &, Int16Array, i16)
+ }
+ DataType::Int32 => {
+ binary_bitwise_array_op!(left, right, &, Int32Array, i32)
+ }
+ DataType::Int64 => {
+ binary_bitwise_array_op!(left, right, &, Int64Array, i64)
+ }
+ other => Err(DataFusionError::Internal(format!(
+ "Data type {:?} not supported for binary operation '{}' on dyn arrays",
+ other,
+ Operator::BitwiseAnd
+ ))),
+ }
+}
+
+pub(crate) fn bitwise_or(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
+ match &left.data_type() {
+ DataType::Int8 => {
+ binary_bitwise_array_op!(left, right, |, Int8Array, i8)
+ }
+ DataType::Int16 => {
+ binary_bitwise_array_op!(left, right, |, Int16Array, i16)
+ }
+ DataType::Int32 => {
+ binary_bitwise_array_op!(left, right, |, Int32Array, i32)
+ }
+ DataType::Int64 => {
+ binary_bitwise_array_op!(left, right, |, Int64Array, i64)
+ }
+ other => Err(DataFusionError::Internal(format!(
+ "Data type {:?} not supported for binary operation '{}' on dyn arrays",
+ other,
+ Operator::BitwiseOr
+ ))),
+ }
+}
+
+pub(crate) fn bitwise_and_scalar(
+ array: &dyn Array,
+ scalar: ScalarValue,
+) -> Option<Result<ArrayRef>> {
+ let result = match array.data_type() {
+ DataType::Int8 => {
+ binary_bitwise_array_scalar!(array, scalar, &, Int8Array, i8)
+ }
+ DataType::Int16 => {
+ binary_bitwise_array_scalar!(array, scalar, &, Int16Array, i16)
+ }
+ DataType::Int32 => {
+ binary_bitwise_array_scalar!(array, scalar, &, Int32Array, i32)
+ }
+ DataType::Int64 => {
+ binary_bitwise_array_scalar!(array, scalar, &, Int64Array, i64)
+ }
+ other => Err(DataFusionError::Internal(format!(
+ "Data type {:?} not supported for binary operation '{}' on dyn arrays",
+ other,
+ Operator::BitwiseAnd
+ ))),
+ };
+ Some(result)
+}
+
+pub(crate) fn bitwise_or_scalar(
+ array: &dyn Array,
+ scalar: ScalarValue,
+) -> Option<Result<ArrayRef>> {
+ let result = match array.data_type() {
+ DataType::Int8 => {
+ binary_bitwise_array_scalar!(array, scalar, |, Int8Array, i8)
+ }
+ DataType::Int16 => {
+ binary_bitwise_array_scalar!(array, scalar, |, Int16Array, i16)
+ }
+ DataType::Int32 => {
+ binary_bitwise_array_scalar!(array, scalar, |, Int32Array, i32)
+ }
+ DataType::Int64 => {
+ binary_bitwise_array_scalar!(array, scalar, |, Int64Array, i64)
+ }
+ other => Err(DataFusionError::Internal(format!(
+ "Data type {:?} not supported for binary operation '{}' on dyn arrays",
+ other,
+ Operator::BitwiseOr
+ ))),
+ };
+ Some(result)
+}
+
+/// Concat lhs and rhs String Array, any `NULL` exists on lhs or rhs will come out result `NULL`
+/// 1. 'a' || 'b' || 32 = 'ab32'
+/// 2. 'a' || NULL = NULL
+pub(crate) fn string_concat(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
+ let left_array = left.as_any().downcast_ref::<StringArray>().unwrap();
+ let right_array = right.as_any().downcast_ref::<StringArray>().unwrap();
+ let result = (0..left.len())
+ .into_iter()
+ .map(|i| {
+ if left.is_null(i) || right.is_null(i) {
+ None
+ } else {
+ let mut owned_string: String = "".to_owned();
+ owned_string.push_str(left_array.value(i));
+ owned_string.push_str(right_array.value(i));
+ Some(owned_string)
+ }
+ })
+ .collect::<StringArray>();
+ Ok(Arc::new(result) as ArrayRef)
+}
diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
new file mode 100644
index 000000000..ba8fff716
--- /dev/null
+++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains computation kernels that are eventually
+//! destined for arrow-rs but are in datafusion until they are ported.
+
+use arrow::error::ArrowError;
+use arrow::{array::*, datatypes::ArrowNumericType};
+use datafusion_common::{DataFusionError, Result};
+
+// Simple (low performance) kernels until optimized kernels are added to arrow
+// See https://github.com/apache/arrow-rs/issues/960
+
+pub(crate) fn is_distinct_from_bool(
+ left: &BooleanArray,
+ right: &BooleanArray,
+) -> Result<BooleanArray> {
+ // Different from `neq_bool` because `null is distinct from null` is false and not null
+ Ok(left
+ .iter()
+ .zip(right.iter())
+ .map(|(left, right)| Some(left != right))
+ .collect())
+}
+
+pub(crate) fn is_not_distinct_from_bool(
+ left: &BooleanArray,
+ right: &BooleanArray,
+) -> Result<BooleanArray> {
+ Ok(left
+ .iter()
+ .zip(right.iter())
+ .map(|(left, right)| Some(left == right))
+ .collect())
+}
+
+pub(crate) fn is_distinct_from<T>(
+ left: &PrimitiveArray<T>,
+ right: &PrimitiveArray<T>,
+) -> Result<BooleanArray>
+where
+ T: ArrowNumericType,
+{
+ Ok(left
+ .iter()
+ .zip(right.iter())
+ .map(|(x, y)| Some(x != y))
+ .collect())
+}
+
+pub(crate) fn is_distinct_from_utf8<OffsetSize: OffsetSizeTrait>(
+ left: &GenericStringArray<OffsetSize>,
+ right: &GenericStringArray<OffsetSize>,
+) -> Result<BooleanArray> {
+ Ok(left
+ .iter()
+ .zip(right.iter())
+ .map(|(x, y)| Some(x != y))
+ .collect())
+}
+
+pub(crate) fn is_distinct_from_null(
+ left: &NullArray,
+ _right: &NullArray,
+) -> Result<BooleanArray> {
+ let length = left.len();
+ make_boolean_array(length, false)
+}
+
+pub(crate) fn is_not_distinct_from_null(
+ left: &NullArray,
+ _right: &NullArray,
+) -> Result<BooleanArray> {
+ let length = left.len();
+ make_boolean_array(length, true)
+}
+
+fn make_boolean_array(length: usize, value: bool) -> Result<BooleanArray> {
+ Ok((0..length).into_iter().map(|_| Some(value)).collect())
+}
+
+pub(crate) fn is_not_distinct_from<T>(
+ left: &PrimitiveArray<T>,
+ right: &PrimitiveArray<T>,
+) -> Result<BooleanArray>
+where
+ T: ArrowNumericType,
+{
+ Ok(left
+ .iter()
+ .zip(right.iter())
+ .map(|(x, y)| Some(x == y))
+ .collect())
+}
+
+pub(crate) fn is_not_distinct_from_utf8<OffsetSize: OffsetSizeTrait>(
+ left: &GenericStringArray<OffsetSize>,
+ right: &GenericStringArray<OffsetSize>,
+) -> Result<BooleanArray> {
+ Ok(left
+ .iter()
+ .zip(right.iter())
+ .map(|(x, y)| Some(x == y))
+ .collect())
+}
+
+// TODO move decimal kernels to to arrow-rs
+// https://github.com/apache/arrow-rs/issues/1200
+
+/// Creates an BooleanArray the same size as `left`,
+/// applying `op` to all non-null elements of left
+pub(crate) fn compare_decimal_scalar<F>(
+ left: &Decimal128Array,
+ right: i128,
+ op: F,
+) -> Result<BooleanArray>
+where
+ F: Fn(i128, i128) -> bool,
+{
+ Ok(left
+ .iter()
+ .map(|left| left.map(|left| op(left, right)))
+ .collect())
+}
+
+/// Creates an BooleanArray the same size as `left`,
+/// by applying `op` to all non-null elements of left and right
+pub(crate) fn compare_decimal<F>(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+ op: F,
+) -> Result<BooleanArray>
+where
+ F: Fn(i128, i128) -> bool,
+{
+ Ok(left
+ .iter()
+ .zip(right.iter())
+ .map(|(left, right)| {
+ if let (Some(left), Some(right)) = (left, right) {
+ Some(op(left, right))
+ } else {
+ None
+ }
+ })
+ .collect())
+}
+
+pub(crate) fn eq_decimal_scalar(
+ left: &Decimal128Array,
+ right: i128,
+) -> Result<BooleanArray> {
+ compare_decimal_scalar(left, right, |left, right| left == right)
+}
+
+pub(crate) fn eq_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<BooleanArray> {
+ compare_decimal(left, right, |left, right| left == right)
+}
+
+pub(crate) fn neq_decimal_scalar(
+ left: &Decimal128Array,
+ right: i128,
+) -> Result<BooleanArray> {
+ compare_decimal_scalar(left, right, |left, right| left != right)
+}
+
+pub(crate) fn neq_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<BooleanArray> {
+ compare_decimal(left, right, |left, right| left != right)
+}
+
+pub(crate) fn lt_decimal_scalar(
+ left: &Decimal128Array,
+ right: i128,
+) -> Result<BooleanArray> {
+ compare_decimal_scalar(left, right, |left, right| left < right)
+}
+
+pub(crate) fn lt_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<BooleanArray> {
+ compare_decimal(left, right, |left, right| left < right)
+}
+
+pub(crate) fn lt_eq_decimal_scalar(
+ left: &Decimal128Array,
+ right: i128,
+) -> Result<BooleanArray> {
+ compare_decimal_scalar(left, right, |left, right| left <= right)
+}
+
+pub(crate) fn lt_eq_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<BooleanArray> {
+ compare_decimal(left, right, |left, right| left <= right)
+}
+
+pub(crate) fn gt_decimal_scalar(
+ left: &Decimal128Array,
+ right: i128,
+) -> Result<BooleanArray> {
+ compare_decimal_scalar(left, right, |left, right| left > right)
+}
+
+pub(crate) fn gt_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<BooleanArray> {
+ compare_decimal(left, right, |left, right| left > right)
+}
+
+pub(crate) fn gt_eq_decimal_scalar(
+ left: &Decimal128Array,
+ right: i128,
+) -> Result<BooleanArray> {
+ compare_decimal_scalar(left, right, |left, right| left >= right)
+}
+
+pub(crate) fn gt_eq_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<BooleanArray> {
+ compare_decimal(left, right, |left, right| left >= right)
+}
+
+pub(crate) fn is_distinct_from_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<BooleanArray> {
+ Ok(left
+ .iter()
+ .zip(right.iter())
+ .map(|(left, right)| match (left, right) {
+ (None, None) => Some(false),
+ (None, Some(_)) | (Some(_), None) => Some(true),
+ (Some(left), Some(right)) => Some(left != right),
+ })
+ .collect())
+}
+
+pub(crate) fn is_not_distinct_from_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<BooleanArray> {
+ Ok(left
+ .iter()
+ .zip(right.iter())
+ .map(|(left, right)| match (left, right) {
+ (None, None) => Some(true),
+ (None, Some(_)) | (Some(_), None) => Some(false),
+ (Some(left), Some(right)) => Some(left == right),
+ })
+ .collect())
+}
+
+/// Creates an Decimal128Array the same size as `left`,
+/// by applying `op` to all non-null elements of left and right
+pub(crate) fn arith_decimal<F>(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+ op: F,
+) -> Result<Decimal128Array>
+where
+ F: Fn(i128, i128) -> Result<i128>,
+{
+ left.iter()
+ .zip(right.iter())
+ .map(|(left, right)| {
+ if let (Some(left), Some(right)) = (left, right) {
+ Some(op(left, right)).transpose()
+ } else {
+ Ok(None)
+ }
+ })
+ .collect()
+}
+
+pub(crate) fn arith_decimal_scalar<F>(
+ left: &Decimal128Array,
+ right: i128,
+ op: F,
+) -> Result<Decimal128Array>
+where
+ F: Fn(i128, i128) -> Result<i128>,
+{
+ left.iter()
+ .map(|left| {
+ if let Some(left) = left {
+ Some(op(left, right)).transpose()
+ } else {
+ Ok(None)
+ }
+ })
+ .collect()
+}
+
+pub(crate) fn add_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<Decimal128Array> {
+ let array = arith_decimal(left, right, |left, right| Ok(left + right))?
+ .with_precision_and_scale(left.precision(), left.scale())?;
+ Ok(array)
+}
+
+pub(crate) fn add_decimal_scalar(
+ left: &Decimal128Array,
+ right: i128,
+) -> Result<Decimal128Array> {
+ let array = arith_decimal_scalar(left, right, |left, right| Ok(left + right))?
+ .with_precision_and_scale(left.precision(), left.scale())?;
+ Ok(array)
+}
+
+pub(crate) fn subtract_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<Decimal128Array> {
+ let array = arith_decimal(left, right, |left, right| Ok(left - right))?
+ .with_precision_and_scale(left.precision(), left.scale())?;
+ Ok(array)
+}
+
+pub(crate) fn subtract_decimal_scalar(
+ left: &Decimal128Array,
+ right: i128,
+) -> Result<Decimal128Array> {
+ let array = arith_decimal_scalar(left, right, |left, right| Ok(left - right))?
+ .with_precision_and_scale(left.precision(), left.scale())?;
+ Ok(array)
+}
+
+pub(crate) fn multiply_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<Decimal128Array> {
+ let divide = 10_i128.pow(left.scale() as u32);
+ let array = arith_decimal(left, right, |left, right| Ok(left * right / divide))?
+ .with_precision_and_scale(left.precision(), left.scale())?;
+ Ok(array)
+}
+
+pub(crate) fn multiply_decimal_scalar(
+ left: &Decimal128Array,
+ right: i128,
+) -> Result<Decimal128Array> {
+ let divide = 10_i128.pow(left.scale() as u32);
+ let array =
+ arith_decimal_scalar(left, right, |left, right| Ok(left * right / divide))?
+ .with_precision_and_scale(left.precision(), left.scale())?;
+ Ok(array)
+}
+
+pub(crate) fn divide_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<Decimal128Array> {
+ let mul = 10_f64.powi(left.scale() as i32);
+ let array = arith_decimal(left, right, |left, right| {
+ let l_value = left as f64;
+ let r_value = right as f64;
+ let result = ((l_value / r_value) * mul) as i128;
+ Ok(result)
+ })?
+ .with_precision_and_scale(left.precision(), left.scale())?;
+ Ok(array)
+}
+
+pub(crate) fn divide_decimal_scalar(
+ left: &Decimal128Array,
+ right: i128,
+) -> Result<Decimal128Array> {
+ let mul = 10_f64.powi(left.scale() as i32);
+ let array = arith_decimal_scalar(left, right, |left, right| {
+ let l_value = left as f64;
+ let r_value = right as f64;
+ let result = ((l_value / r_value) * mul) as i128;
+ Ok(result)
+ })?
+ .with_precision_and_scale(left.precision(), left.scale())?;
+ Ok(array)
+}
+
+pub(crate) fn modulus_decimal(
+ left: &Decimal128Array,
+ right: &Decimal128Array,
+) -> Result<Decimal128Array> {
+ let array = arith_decimal(left, right, |left, right| {
+ if right == 0 {
+ Err(DataFusionError::ArrowError(ArrowError::DivideByZero))
+ } else {
+ Ok(left % right)
+ }
+ })?
+ .with_precision_and_scale(left.precision(), left.scale())?;
+ Ok(array)
+}
+
+pub(crate) fn modulus_decimal_scalar(
+ left: &Decimal128Array,
+ right: i128,
+) -> Result<Decimal128Array> {
+ if right == 0 {
+ return Err(DataFusionError::ArrowError(ArrowError::DivideByZero));
+ }
+ let array = arith_decimal_scalar(left, right, |left, right| Ok(left % right))?
+ .with_precision_and_scale(left.precision(), left.scale())?;
+ Ok(array)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn create_decimal_array(
+ array: &[Option<i128>],
+ precision: usize,
+ scale: usize,
+ ) -> Decimal128Array {
+ let mut decimal_builder = Decimal128Builder::new(array.len(), precision, scale);
+ for value in array {
+ match value {
+ None => {
+ decimal_builder.append_null();
+ }
+ Some(v) => {
+ decimal_builder.append_value(*v).expect("valid value");
+ }
+ }
+ }
+ decimal_builder.finish()
+ }
+
+ #[test]
+ fn comparison_decimal_op_test() -> Result<()> {
+ let value_i128: i128 = 123;
+ let decimal_array = create_decimal_array(
+ &[
+ Some(value_i128),
+ None,
+ Some(value_i128 - 1),
+ Some(value_i128 + 1),
+ ],
+ 25,
+ 3,
+ );
+ // eq: array = i128
+ let result = eq_decimal_scalar(&decimal_array, value_i128)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
+ result
+ );
+ // neq: array != i128
+ let result = neq_decimal_scalar(&decimal_array, value_i128)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
+ result
+ );
+ // lt: array < i128
+ let result = lt_decimal_scalar(&decimal_array, value_i128)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
+ result
+ );
+ // lt_eq: array <= i128
+ let result = lt_eq_decimal_scalar(&decimal_array, value_i128)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
+ result
+ );
+ // gt: array > i128
+ let result = gt_decimal_scalar(&decimal_array, value_i128)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
+ result
+ );
+ // gt_eq: array >= i128
+ let result = gt_eq_decimal_scalar(&decimal_array, value_i128)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
+ result
+ );
+
+ let left_decimal_array = decimal_array;
+ let right_decimal_array = create_decimal_array(
+ &[
+ Some(value_i128 - 1),
+ Some(value_i128),
+ Some(value_i128 + 1),
+ Some(value_i128 + 1),
+ ],
+ 25,
+ 3,
+ );
+ // eq: left == right
+ let result = eq_decimal(&left_decimal_array, &right_decimal_array)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
+ result
+ );
+ // neq: left != right
+ let result = neq_decimal(&left_decimal_array, &right_decimal_array)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
+ result
+ );
+ // lt: left < right
+ let result = lt_decimal(&left_decimal_array, &right_decimal_array)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
+ result
+ );
+ // lt_eq: left <= right
+ let result = lt_eq_decimal(&left_decimal_array, &right_decimal_array)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
+ result
+ );
+ // gt: left > right
+ let result = gt_decimal(&left_decimal_array, &right_decimal_array)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
+ result
+ );
+ // gt_eq: left >= right
+ let result = gt_eq_decimal(&left_decimal_array, &right_decimal_array)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
+ result
+ );
+ // is_distinct: left distinct right
+ let result = is_distinct_from_decimal(&left_decimal_array, &right_decimal_array)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(true), Some(true), Some(true), Some(false)]),
+ result
+ );
+ // is_distinct: left distinct right
+ let result =
+ is_not_distinct_from_decimal(&left_decimal_array, &right_decimal_array)?;
+ assert_eq!(
+ BooleanArray::from(vec![Some(false), Some(false), Some(false), Some(true)]),
+ result
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn arithmetic_decimal_op_test() -> Result<()> {
+ let value_i128: i128 = 123;
+ let left_decimal_array = create_decimal_array(
+ &[
+ Some(value_i128),
+ None,
+ Some(value_i128 - 1),
+ Some(value_i128 + 1),
+ ],
+ 25,
+ 3,
+ );
+ let right_decimal_array = create_decimal_array(
+ &[
+ Some(value_i128),
+ Some(value_i128),
+ Some(value_i128),
+ Some(value_i128),
+ ],
+ 25,
+ 3,
+ );
+ // add
+ let result = add_decimal(&left_decimal_array, &right_decimal_array)?;
+ let expect =
+ create_decimal_array(&[Some(246), None, Some(245), Some(247)], 25, 3);
+ assert_eq!(expect, result);
+ let result = add_decimal_scalar(&left_decimal_array, 10)?;
+ let expect =
+ create_decimal_array(&[Some(133), None, Some(132), Some(134)], 25, 3);
+ assert_eq!(expect, result);
+ // subtract
+ let result = subtract_decimal(&left_decimal_array, &right_decimal_array)?;
+ let expect = create_decimal_array(&[Some(0), None, Some(-1), Some(1)], 25, 3);
+ assert_eq!(expect, result);
+ let result = subtract_decimal_scalar(&left_decimal_array, 10)?;
+ let expect =
+ create_decimal_array(&[Some(113), None, Some(112), Some(114)], 25, 3);
+ assert_eq!(expect, result);
+ // multiply
+ let result = multiply_decimal(&left_decimal_array, &right_decimal_array)?;
+ let expect = create_decimal_array(&[Some(15), None, Some(15), Some(15)], 25, 3);
+ assert_eq!(expect, result);
+ let result = multiply_decimal_scalar(&left_decimal_array, 10)?;
+ let expect = create_decimal_array(&[Some(1), None, Some(1), Some(1)], 25, 3);
+ assert_eq!(expect, result);
+ // divide
+ let left_decimal_array = create_decimal_array(
+ &[Some(1234567), None, Some(1234567), Some(1234567)],
+ 25,
+ 3,
+ );
+ let right_decimal_array =
+ create_decimal_array(&[Some(10), Some(100), Some(55), Some(-123)], 25, 3);
+ let result = divide_decimal(&left_decimal_array, &right_decimal_array)?;
+ let expect = create_decimal_array(
+ &[Some(123456700), None, Some(22446672), Some(-10037130)],
+ 25,
+ 3,
+ );
+ assert_eq!(expect, result);
+ let result = divide_decimal_scalar(&left_decimal_array, 10)?;
+ let expect = create_decimal_array(
+ &[Some(123456700), None, Some(123456700), Some(123456700)],
+ 25,
+ 3,
+ );
+ assert_eq!(expect, result);
+ // modulus
+ let result = modulus_decimal(&left_decimal_array, &right_decimal_array)?;
+ let expect = create_decimal_array(&[Some(7), None, Some(37), Some(16)], 25, 3);
+ assert_eq!(expect, result);
+ let result = modulus_decimal_scalar(&left_decimal_array, 10)?;
+ let expect = create_decimal_array(&[Some(7), None, Some(7), Some(7)], 25, 3);
+ assert_eq!(expect, result);
+
+ Ok(())
+ }
+}