You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/08/04 16:33:45 UTC

[arrow-datafusion] branch master updated: Split binary_expr.rs into smaller modules (#3026)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d07a775c5 Split binary_expr.rs into smaller modules (#3026)
d07a775c5 is described below

commit d07a775c55091195a3f615db61e49b1ef97ce5ba
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Aug 4 12:33:40 2022 -0400

    Split binary_expr.rs into smaller modules (#3026)
---
 datafusion/physical-expr/src/expressions/binary.rs | 791 +--------------------
 .../src/expressions/binary/adapter.rs              |  62 ++
 .../src/expressions/binary/kernels.rs              | 191 +++++
 .../src/expressions/binary/kernels_arrow.rs        | 647 +++++++++++++++++
 4 files changed, 921 insertions(+), 770 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index b7492445b..004628a80 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -15,10 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod adapter;
+mod kernels;
+mod kernels_arrow;
+
 use std::convert::TryInto;
 use std::{any::Any, sync::Arc};
 
-use arrow::array::TimestampMillisecondArray;
 use arrow::array::*;
 use arrow::compute::kernels::arithmetic::{
     add, add_scalar, divide, divide_scalar, modulus, modulus_scalar, multiply,
@@ -44,8 +47,23 @@ use arrow::compute::kernels::comparison::{like_utf8, nlike_utf8, regexp_is_match
 use arrow::compute::kernels::comparison::{
     like_utf8_scalar, nlike_utf8_scalar, regexp_is_match_utf8_scalar,
 };
-use arrow::datatypes::{ArrowNumericType, DataType, Schema, TimeUnit};
-use arrow::error::ArrowError::DivideByZero;
+
+use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
+use kernels::{
+    bitwise_and, bitwise_and_scalar, bitwise_or, bitwise_or_scalar, string_concat,
+};
+use kernels_arrow::{
+    add_decimal, add_decimal_scalar, divide_decimal, divide_decimal_scalar,
+    eq_decimal_scalar, gt_decimal_scalar, gt_eq_decimal_scalar, is_distinct_from,
+    is_distinct_from_bool, is_distinct_from_decimal, is_distinct_from_null,
+    is_distinct_from_utf8, is_not_distinct_from, is_not_distinct_from_bool,
+    is_not_distinct_from_decimal, is_not_distinct_from_null, is_not_distinct_from_utf8,
+    lt_decimal_scalar, lt_eq_decimal_scalar, modulus_decimal, modulus_decimal_scalar,
+    multiply_decimal, multiply_decimal_scalar, neq_decimal_scalar, subtract_decimal,
+    subtract_decimal_scalar,
+};
+
+use arrow::datatypes::{DataType, Schema, TimeUnit};
 use arrow::record_batch::RecordBatch;
 
 use crate::expressions::try_cast;
@@ -55,513 +73,6 @@ use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::binary_rule::binary_operator_data_type;
 use datafusion_expr::{binary_rule::coerce_types, ColumnarValue, Operator};
 
-/// create a `dyn_op` wrapper function for the specified operation
-/// that call the underlying dyn_op arrow kernel if the type is
-/// supported, and translates ArrowError to DataFusionError
-macro_rules! make_dyn_comp_op {
-    ($OP:tt) => {
-        paste::paste! {
-            /// wrapper over arrow compute kernel that maps Error types and
-            /// patches missing support in arrow
-            fn [<$OP _dyn>] (left: &dyn Array, right: &dyn Array) -> Result<ArrayRef> {
-                match (left.data_type(), right.data_type()) {
-                    // Call `op_decimal` (e.g. `eq_decimal) until
-                    // arrow has native support
-                    // https://github.com/apache/arrow-rs/issues/1200
-                    (DataType::Decimal(_, _), DataType::Decimal(_, _)) => {
-                        [<$OP _decimal>](as_decimal_array(left), as_decimal_array(right))
-                    },
-                    // By default call the arrow kernel
-                    _ => {
-                    arrow::compute::kernels::comparison::[<$OP _dyn>](left, right)
-                            .map_err(|e| e.into())
-                    }
-                }
-                .map(|a| Arc::new(a) as ArrayRef)
-            }
-        }
-    };
-}
-
-// create eq_dyn, gt_dyn, wrappers etc
-make_dyn_comp_op!(eq);
-make_dyn_comp_op!(gt);
-make_dyn_comp_op!(gt_eq);
-make_dyn_comp_op!(lt);
-make_dyn_comp_op!(lt_eq);
-make_dyn_comp_op!(neq);
-
-// Simple (low performance) kernels until optimized kernels are added to arrow
-// See https://github.com/apache/arrow-rs/issues/960
-
-fn is_distinct_from_bool(
-    left: &BooleanArray,
-    right: &BooleanArray,
-) -> Result<BooleanArray> {
-    // Different from `neq_bool` because `null is distinct from null` is false and not null
-    Ok(left
-        .iter()
-        .zip(right.iter())
-        .map(|(left, right)| Some(left != right))
-        .collect())
-}
-
-fn is_not_distinct_from_bool(
-    left: &BooleanArray,
-    right: &BooleanArray,
-) -> Result<BooleanArray> {
-    Ok(left
-        .iter()
-        .zip(right.iter())
-        .map(|(left, right)| Some(left == right))
-        .collect())
-}
-
-// TODO move decimal kernels to to arrow-rs
-// https://github.com/apache/arrow-rs/issues/1200
-
-/// Creates an BooleanArray the same size as `left`,
-/// applying `op` to all non-null elements of left
-fn compare_decimal_scalar<F>(
-    left: &Decimal128Array,
-    right: i128,
-    op: F,
-) -> Result<BooleanArray>
-where
-    F: Fn(i128, i128) -> bool,
-{
-    Ok(left
-        .iter()
-        .map(|left| left.map(|left| op(left, right)))
-        .collect())
-}
-
-/// Creates an BooleanArray the same size as `left`,
-/// by applying `op` to all non-null elements of left and right
-fn compare_decimal<F>(
-    left: &Decimal128Array,
-    right: &Decimal128Array,
-    op: F,
-) -> Result<BooleanArray>
-where
-    F: Fn(i128, i128) -> bool,
-{
-    Ok(left
-        .iter()
-        .zip(right.iter())
-        .map(|(left, right)| {
-            if let (Some(left), Some(right)) = (left, right) {
-                Some(op(left, right))
-            } else {
-                None
-            }
-        })
-        .collect())
-}
-
-pub(super) fn eq_decimal_scalar(
-    left: &Decimal128Array,
-    right: i128,
-) -> Result<BooleanArray> {
-    compare_decimal_scalar(left, right, |left, right| left == right)
-}
-
-pub(super) fn eq_decimal(
-    left: &Decimal128Array,
-    right: &Decimal128Array,
-) -> Result<BooleanArray> {
-    compare_decimal(left, right, |left, right| left == right)
-}
-
-fn neq_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<BooleanArray> {
-    compare_decimal_scalar(left, right, |left, right| left != right)
-}
-
-fn neq_decimal(left: &Decimal128Array, right: &Decimal128Array) -> Result<BooleanArray> {
-    compare_decimal(left, right, |left, right| left != right)
-}
-
-fn lt_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<BooleanArray> {
-    compare_decimal_scalar(left, right, |left, right| left < right)
-}
-
-fn lt_decimal(left: &Decimal128Array, right: &Decimal128Array) -> Result<BooleanArray> {
-    compare_decimal(left, right, |left, right| left < right)
-}
-
-fn lt_eq_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<BooleanArray> {
-    compare_decimal_scalar(left, right, |left, right| left <= right)
-}
-
-fn lt_eq_decimal(
-    left: &Decimal128Array,
-    right: &Decimal128Array,
-) -> Result<BooleanArray> {
-    compare_decimal(left, right, |left, right| left <= right)
-}
-
-fn gt_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<BooleanArray> {
-    compare_decimal_scalar(left, right, |left, right| left > right)
-}
-
-fn gt_decimal(left: &Decimal128Array, right: &Decimal128Array) -> Result<BooleanArray> {
-    compare_decimal(left, right, |left, right| left > right)
-}
-
-fn gt_eq_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<BooleanArray> {
-    compare_decimal_scalar(left, right, |left, right| left >= right)
-}
-
-fn gt_eq_decimal(
-    left: &Decimal128Array,
-    right: &Decimal128Array,
-) -> Result<BooleanArray> {
-    compare_decimal(left, right, |left, right| left >= right)
-}
-
-fn is_distinct_from_decimal(
-    left: &Decimal128Array,
-    right: &Decimal128Array,
-) -> Result<BooleanArray> {
-    Ok(left
-        .iter()
-        .zip(right.iter())
-        .map(|(left, right)| match (left, right) {
-            (None, None) => Some(false),
-            (None, Some(_)) | (Some(_), None) => Some(true),
-            (Some(left), Some(right)) => Some(left != right),
-        })
-        .collect())
-}
-
-fn is_not_distinct_from_decimal(
-    left: &Decimal128Array,
-    right: &Decimal128Array,
-) -> Result<BooleanArray> {
-    Ok(left
-        .iter()
-        .zip(right.iter())
-        .map(|(left, right)| match (left, right) {
-            (None, None) => Some(true),
-            (None, Some(_)) | (Some(_), None) => Some(false),
-            (Some(left), Some(right)) => Some(left == right),
-        })
-        .collect())
-}
-
-/// Creates an Decimal128Array the same size as `left`,
-/// by applying `op` to all non-null elements of left and right
-fn arith_decimal<F>(
-    left: &Decimal128Array,
-    right: &Decimal128Array,
-    op: F,
-) -> Result<Decimal128Array>
-where
-    F: Fn(i128, i128) -> Result<i128>,
-{
-    left.iter()
-        .zip(right.iter())
-        .map(|(left, right)| {
-            if let (Some(left), Some(right)) = (left, right) {
-                Some(op(left, right)).transpose()
-            } else {
-                Ok(None)
-            }
-        })
-        .collect()
-}
-
-fn arith_decimal_scalar<F>(
-    left: &Decimal128Array,
-    right: i128,
-    op: F,
-) -> Result<Decimal128Array>
-where
-    F: Fn(i128, i128) -> Result<i128>,
-{
-    left.iter()
-        .map(|left| {
-            if let Some(left) = left {
-                Some(op(left, right)).transpose()
-            } else {
-                Ok(None)
-            }
-        })
-        .collect()
-}
-
-fn add_decimal(
-    left: &Decimal128Array,
-    right: &Decimal128Array,
-) -> Result<Decimal128Array> {
-    let array = arith_decimal(left, right, |left, right| Ok(left + right))?
-        .with_precision_and_scale(left.precision(), left.scale())?;
-    Ok(array)
-}
-
-fn add_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<Decimal128Array> {
-    let array = arith_decimal_scalar(left, right, |left, right| Ok(left + right))?
-        .with_precision_and_scale(left.precision(), left.scale())?;
-    Ok(array)
-}
-
-fn subtract_decimal(
-    left: &Decimal128Array,
-    right: &Decimal128Array,
-) -> Result<Decimal128Array> {
-    let array = arith_decimal(left, right, |left, right| Ok(left - right))?
-        .with_precision_and_scale(left.precision(), left.scale())?;
-    Ok(array)
-}
-
-fn subtract_decimal_scalar(
-    left: &Decimal128Array,
-    right: i128,
-) -> Result<Decimal128Array> {
-    let array = arith_decimal_scalar(left, right, |left, right| Ok(left - right))?
-        .with_precision_and_scale(left.precision(), left.scale())?;
-    Ok(array)
-}
-
-fn multiply_decimal(
-    left: &Decimal128Array,
-    right: &Decimal128Array,
-) -> Result<Decimal128Array> {
-    let divide = 10_i128.pow(left.scale() as u32);
-    let array = arith_decimal(left, right, |left, right| Ok(left * right / divide))?
-        .with_precision_and_scale(left.precision(), left.scale())?;
-    Ok(array)
-}
-
-fn multiply_decimal_scalar(
-    left: &Decimal128Array,
-    right: i128,
-) -> Result<Decimal128Array> {
-    let divide = 10_i128.pow(left.scale() as u32);
-    let array =
-        arith_decimal_scalar(left, right, |left, right| Ok(left * right / divide))?
-            .with_precision_and_scale(left.precision(), left.scale())?;
-    Ok(array)
-}
-
-fn divide_decimal(
-    left: &Decimal128Array,
-    right: &Decimal128Array,
-) -> Result<Decimal128Array> {
-    let mul = 10_f64.powi(left.scale() as i32);
-    let array = arith_decimal(left, right, |left, right| {
-        let l_value = left as f64;
-        let r_value = right as f64;
-        let result = ((l_value / r_value) * mul) as i128;
-        Ok(result)
-    })?
-    .with_precision_and_scale(left.precision(), left.scale())?;
-    Ok(array)
-}
-
-fn divide_decimal_scalar(left: &Decimal128Array, right: i128) -> Result<Decimal128Array> {
-    let mul = 10_f64.powi(left.scale() as i32);
-    let array = arith_decimal_scalar(left, right, |left, right| {
-        let l_value = left as f64;
-        let r_value = right as f64;
-        let result = ((l_value / r_value) * mul) as i128;
-        Ok(result)
-    })?
-    .with_precision_and_scale(left.precision(), left.scale())?;
-    Ok(array)
-}
-
-fn modulus_decimal(
-    left: &Decimal128Array,
-    right: &Decimal128Array,
-) -> Result<Decimal128Array> {
-    let array = arith_decimal(left, right, |left, right| {
-        if right == 0 {
-            Err(DataFusionError::ArrowError(DivideByZero))
-        } else {
-            Ok(left % right)
-        }
-    })?
-    .with_precision_and_scale(left.precision(), left.scale())?;
-    Ok(array)
-}
-
-fn modulus_decimal_scalar(
-    left: &Decimal128Array,
-    right: i128,
-) -> Result<Decimal128Array> {
-    if right == 0 {
-        return Err(DataFusionError::ArrowError(DivideByZero));
-    }
-    let array = arith_decimal_scalar(left, right, |left, right| Ok(left % right))?
-        .with_precision_and_scale(left.precision(), left.scale())?;
-    Ok(array)
-}
-
-/// The binary_bitwise_array_op macro only evaluates for integer types
-/// like int64, int32.
-/// It is used to do bitwise operation.
-macro_rules! binary_bitwise_array_op {
-    ($LEFT:expr, $RIGHT:expr, $OP:tt, $ARRAY_TYPE:ident, $TYPE:ty) => {{
-        let len = $LEFT.len();
-        let left = $LEFT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
-        let right = $RIGHT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
-        let result = (0..len)
-            .into_iter()
-            .map(|i| {
-                if left.is_null(i) || right.is_null(i) {
-                    None
-                } else {
-                    Some(left.value(i) $OP right.value(i))
-                }
-            })
-            .collect::<$ARRAY_TYPE>();
-        Ok(Arc::new(result))
-    }};
-}
-
-/// The binary_bitwise_array_op macro only evaluates for integer types
-/// like int64, int32.
-/// It is used to do bitwise operation on an array with a scalar.
-macro_rules! binary_bitwise_array_scalar {
-    ($LEFT:expr, $RIGHT:expr, $OP:tt, $ARRAY_TYPE:ident, $TYPE:ty) => {{
-        let len = $LEFT.len();
-        let array = $LEFT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
-        let scalar = $RIGHT;
-        if scalar.is_null() {
-            Ok(new_null_array(array.data_type(), len))
-        } else {
-            let right: $TYPE = scalar.try_into().unwrap();
-            let result = (0..len)
-                .into_iter()
-                .map(|i| {
-                    if array.is_null(i) {
-                        None
-                    } else {
-                        Some(array.value(i) $OP right)
-                    }
-                })
-                .collect::<$ARRAY_TYPE>();
-            Ok(Arc::new(result) as ArrayRef)
-        }
-    }};
-}
-
-fn bitwise_and(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
-    match &left.data_type() {
-        DataType::Int8 => {
-            binary_bitwise_array_op!(left, right, &, Int8Array, i8)
-        }
-        DataType::Int16 => {
-            binary_bitwise_array_op!(left, right, &, Int16Array, i16)
-        }
-        DataType::Int32 => {
-            binary_bitwise_array_op!(left, right, &, Int32Array, i32)
-        }
-        DataType::Int64 => {
-            binary_bitwise_array_op!(left, right, &, Int64Array, i64)
-        }
-        other => Err(DataFusionError::Internal(format!(
-            "Data type {:?} not supported for binary operation '{}' on dyn arrays",
-            other,
-            Operator::BitwiseAnd
-        ))),
-    }
-}
-
-fn bitwise_or(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
-    match &left.data_type() {
-        DataType::Int8 => {
-            binary_bitwise_array_op!(left, right, |, Int8Array, i8)
-        }
-        DataType::Int16 => {
-            binary_bitwise_array_op!(left, right, |, Int16Array, i16)
-        }
-        DataType::Int32 => {
-            binary_bitwise_array_op!(left, right, |, Int32Array, i32)
-        }
-        DataType::Int64 => {
-            binary_bitwise_array_op!(left, right, |, Int64Array, i64)
-        }
-        other => Err(DataFusionError::Internal(format!(
-            "Data type {:?} not supported for binary operation '{}' on dyn arrays",
-            other,
-            Operator::BitwiseOr
-        ))),
-    }
-}
-
-/// Concat lhs and rhs String Array, any `NULL` exists on lhs or rhs will come out result `NULL`
-/// 1. 'a' || 'b' || 32 = 'ab32'
-/// 2. 'a' || NULL = NULL
-fn string_concat(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
-    let left_array = left.as_any().downcast_ref::<StringArray>().unwrap();
-    let right_array = right.as_any().downcast_ref::<StringArray>().unwrap();
-    let result = (0..left.len())
-        .into_iter()
-        .map(|i| {
-            if left.is_null(i) || right.is_null(i) {
-                None
-            } else {
-                let mut owned_string: String = "".to_owned();
-                owned_string.push_str(left_array.value(i));
-                owned_string.push_str(right_array.value(i));
-                Some(owned_string)
-            }
-        })
-        .collect::<StringArray>();
-    Ok(Arc::new(result) as ArrayRef)
-}
-
-fn bitwise_and_scalar(
-    array: &dyn Array,
-    scalar: ScalarValue,
-) -> Option<Result<ArrayRef>> {
-    let result = match array.data_type() {
-        DataType::Int8 => {
-            binary_bitwise_array_scalar!(array, scalar, &, Int8Array, i8)
-        }
-        DataType::Int16 => {
-            binary_bitwise_array_scalar!(array, scalar, &, Int16Array, i16)
-        }
-        DataType::Int32 => {
-            binary_bitwise_array_scalar!(array, scalar, &, Int32Array, i32)
-        }
-        DataType::Int64 => {
-            binary_bitwise_array_scalar!(array, scalar, &, Int64Array, i64)
-        }
-        other => Err(DataFusionError::Internal(format!(
-            "Data type {:?} not supported for binary operation '{}' on dyn arrays",
-            other,
-            Operator::BitwiseAnd
-        ))),
-    };
-    Some(result)
-}
-
-fn bitwise_or_scalar(array: &dyn Array, scalar: ScalarValue) -> Option<Result<ArrayRef>> {
-    let result = match array.data_type() {
-        DataType::Int8 => {
-            binary_bitwise_array_scalar!(array, scalar, |, Int8Array, i8)
-        }
-        DataType::Int16 => {
-            binary_bitwise_array_scalar!(array, scalar, |, Int16Array, i16)
-        }
-        DataType::Int32 => {
-            binary_bitwise_array_scalar!(array, scalar, |, Int32Array, i32)
-        }
-        DataType::Int64 => {
-            binary_bitwise_array_scalar!(array, scalar, |, Int64Array, i64)
-        }
-        other => Err(DataFusionError::Internal(format!(
-            "Data type {:?} not supported for binary operation '{}' on dyn arrays",
-            other,
-            Operator::BitwiseOr
-        ))),
-    };
-    Some(result)
-}
-
 /// Binary expression
 #[derive(Debug)]
 pub struct BinaryExpr {
@@ -1345,73 +856,6 @@ impl BinaryExpr {
     }
 }
 
-fn is_distinct_from<T>(
-    left: &PrimitiveArray<T>,
-    right: &PrimitiveArray<T>,
-) -> Result<BooleanArray>
-where
-    T: ArrowNumericType,
-{
-    Ok(left
-        .iter()
-        .zip(right.iter())
-        .map(|(x, y)| Some(x != y))
-        .collect())
-}
-
-fn is_distinct_from_utf8<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &GenericStringArray<OffsetSize>,
-) -> Result<BooleanArray> {
-    Ok(left
-        .iter()
-        .zip(right.iter())
-        .map(|(x, y)| Some(x != y))
-        .collect())
-}
-
-fn is_distinct_from_null(left: &NullArray, _right: &NullArray) -> Result<BooleanArray> {
-    let length = left.len();
-    make_boolean_array(length, false)
-}
-
-fn is_not_distinct_from_null(
-    left: &NullArray,
-    _right: &NullArray,
-) -> Result<BooleanArray> {
-    let length = left.len();
-    make_boolean_array(length, true)
-}
-
-fn make_boolean_array(length: usize, value: bool) -> Result<BooleanArray> {
-    Ok((0..length).into_iter().map(|_| Some(value)).collect())
-}
-
-fn is_not_distinct_from<T>(
-    left: &PrimitiveArray<T>,
-    right: &PrimitiveArray<T>,
-) -> Result<BooleanArray>
-where
-    T: ArrowNumericType,
-{
-    Ok(left
-        .iter()
-        .zip(right.iter())
-        .map(|(x, y)| Some(x == y))
-        .collect())
-}
-
-fn is_not_distinct_from_utf8<OffsetSize: OffsetSizeTrait>(
-    left: &GenericStringArray<OffsetSize>,
-    right: &GenericStringArray<OffsetSize>,
-) -> Result<BooleanArray> {
-    Ok(left
-        .iter()
-        .zip(right.iter())
-        .map(|(x, y)| Some(x == y))
-        .collect())
-}
-
 /// return two physical expressions that are optionally coerced to a
 /// common type that the binary operator supports.
 fn binary_cast(
@@ -2674,119 +2118,6 @@ mod tests {
         decimal_builder.finish()
     }
 
-    #[test]
-    fn comparison_decimal_op_test() -> Result<()> {
-        let value_i128: i128 = 123;
-        let decimal_array = create_decimal_array(
-            &[
-                Some(value_i128),
-                None,
-                Some(value_i128 - 1),
-                Some(value_i128 + 1),
-            ],
-            25,
-            3,
-        );
-        // eq: array = i128
-        let result = eq_decimal_scalar(&decimal_array, value_i128)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
-            result
-        );
-        // neq: array != i128
-        let result = neq_decimal_scalar(&decimal_array, value_i128)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
-            result
-        );
-        // lt: array < i128
-        let result = lt_decimal_scalar(&decimal_array, value_i128)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
-            result
-        );
-        // lt_eq: array <= i128
-        let result = lt_eq_decimal_scalar(&decimal_array, value_i128)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
-            result
-        );
-        // gt: array > i128
-        let result = gt_decimal_scalar(&decimal_array, value_i128)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
-            result
-        );
-        // gt_eq: array >= i128
-        let result = gt_eq_decimal_scalar(&decimal_array, value_i128)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
-            result
-        );
-
-        let left_decimal_array = decimal_array;
-        let right_decimal_array = create_decimal_array(
-            &[
-                Some(value_i128 - 1),
-                Some(value_i128),
-                Some(value_i128 + 1),
-                Some(value_i128 + 1),
-            ],
-            25,
-            3,
-        );
-        // eq: left == right
-        let result = eq_decimal(&left_decimal_array, &right_decimal_array)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
-            result
-        );
-        // neq: left != right
-        let result = neq_decimal(&left_decimal_array, &right_decimal_array)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
-            result
-        );
-        // lt: left < right
-        let result = lt_decimal(&left_decimal_array, &right_decimal_array)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
-            result
-        );
-        // lt_eq: left <= right
-        let result = lt_eq_decimal(&left_decimal_array, &right_decimal_array)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
-            result
-        );
-        // gt: left > right
-        let result = gt_decimal(&left_decimal_array, &right_decimal_array)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
-            result
-        );
-        // gt_eq: left >= right
-        let result = gt_eq_decimal(&left_decimal_array, &right_decimal_array)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
-            result
-        );
-        // is_distinct: left distinct right
-        let result = is_distinct_from_decimal(&left_decimal_array, &right_decimal_array)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(true), Some(true), Some(true), Some(false)]),
-            result
-        );
-        // is_distinct: left distinct right
-        let result =
-            is_not_distinct_from_decimal(&left_decimal_array, &right_decimal_array)?;
-        assert_eq!(
-            BooleanArray::from(vec![Some(false), Some(false), Some(false), Some(true)]),
-            result
-        );
-        Ok(())
-    }
-
     #[test]
     fn comparison_decimal_expr_test() -> Result<()> {
         let decimal_scalar = ScalarValue::Decimal128(Some(123_456), 10, 3);
@@ -3000,86 +2331,6 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn arithmetic_decimal_op_test() -> Result<()> {
-        let value_i128: i128 = 123;
-        let left_decimal_array = create_decimal_array(
-            &[
-                Some(value_i128),
-                None,
-                Some(value_i128 - 1),
-                Some(value_i128 + 1),
-            ],
-            25,
-            3,
-        );
-        let right_decimal_array = create_decimal_array(
-            &[
-                Some(value_i128),
-                Some(value_i128),
-                Some(value_i128),
-                Some(value_i128),
-            ],
-            25,
-            3,
-        );
-        // add
-        let result = add_decimal(&left_decimal_array, &right_decimal_array)?;
-        let expect =
-            create_decimal_array(&[Some(246), None, Some(245), Some(247)], 25, 3);
-        assert_eq!(expect, result);
-        let result = add_decimal_scalar(&left_decimal_array, 10)?;
-        let expect =
-            create_decimal_array(&[Some(133), None, Some(132), Some(134)], 25, 3);
-        assert_eq!(expect, result);
-        // subtract
-        let result = subtract_decimal(&left_decimal_array, &right_decimal_array)?;
-        let expect = create_decimal_array(&[Some(0), None, Some(-1), Some(1)], 25, 3);
-        assert_eq!(expect, result);
-        let result = subtract_decimal_scalar(&left_decimal_array, 10)?;
-        let expect =
-            create_decimal_array(&[Some(113), None, Some(112), Some(114)], 25, 3);
-        assert_eq!(expect, result);
-        // multiply
-        let result = multiply_decimal(&left_decimal_array, &right_decimal_array)?;
-        let expect = create_decimal_array(&[Some(15), None, Some(15), Some(15)], 25, 3);
-        assert_eq!(expect, result);
-        let result = multiply_decimal_scalar(&left_decimal_array, 10)?;
-        let expect = create_decimal_array(&[Some(1), None, Some(1), Some(1)], 25, 3);
-        assert_eq!(expect, result);
-        // divide
-        let left_decimal_array = create_decimal_array(
-            &[Some(1234567), None, Some(1234567), Some(1234567)],
-            25,
-            3,
-        );
-        let right_decimal_array =
-            create_decimal_array(&[Some(10), Some(100), Some(55), Some(-123)], 25, 3);
-        let result = divide_decimal(&left_decimal_array, &right_decimal_array)?;
-        let expect = create_decimal_array(
-            &[Some(123456700), None, Some(22446672), Some(-10037130)],
-            25,
-            3,
-        );
-        assert_eq!(expect, result);
-        let result = divide_decimal_scalar(&left_decimal_array, 10)?;
-        let expect = create_decimal_array(
-            &[Some(123456700), None, Some(123456700), Some(123456700)],
-            25,
-            3,
-        );
-        assert_eq!(expect, result);
-        // modulus
-        let result = modulus_decimal(&left_decimal_array, &right_decimal_array)?;
-        let expect = create_decimal_array(&[Some(7), None, Some(37), Some(16)], 25, 3);
-        assert_eq!(expect, result);
-        let result = modulus_decimal_scalar(&left_decimal_array, 10)?;
-        let expect = create_decimal_array(&[Some(7), None, Some(7), Some(7)], 25, 3);
-        assert_eq!(expect, result);
-
-        Ok(())
-    }
-
     fn apply_arithmetic_op(
         schema: &SchemaRef,
         left: &ArrayRef,
diff --git a/datafusion/physical-expr/src/expressions/binary/adapter.rs b/datafusion/physical-expr/src/expressions/binary/adapter.rs
new file mode 100644
index 000000000..b0293cdf0
--- /dev/null
+++ b/datafusion/physical-expr/src/expressions/binary/adapter.rs
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains functions that change types or names of other
+//! kernels to make them compatible with the main dispatch logic
+
+use std::sync::Arc;
+
+use super::kernels_arrow::*;
+use arrow::array::*;
+use arrow::datatypes::DataType;
+use datafusion_common::Result;
+
+/// create a `dyn_op` wrapper function for the specified operation
+/// that call the underlying dyn_op arrow kernel if the type is
+/// supported, and translates ArrowError to DataFusionError
+macro_rules! make_dyn_comp_op {
+    ($OP:tt) => {
+        paste::paste! {
+            /// wrapper over arrow compute kernel that maps Error types and
+            /// patches missing support in arrow
+            pub(crate) fn [<$OP _dyn>] (left: &dyn Array, right: &dyn Array) -> Result<ArrayRef> {
+                match (left.data_type(), right.data_type()) {
+                    // Call `op_decimal` (e.g. `eq_decimal) until
+                    // arrow has native support
+                    // https://github.com/apache/arrow-rs/issues/1200
+                    (DataType::Decimal(_, _), DataType::Decimal(_, _)) => {
+                        [<$OP _decimal>](as_decimal_array(left), as_decimal_array(right))
+                    },
+                    // By default call the arrow kernel
+                    _ => {
+                    arrow::compute::kernels::comparison::[<$OP _dyn>](left, right)
+                            .map_err(|e| e.into())
+                    }
+                }
+                .map(|a| Arc::new(a) as ArrayRef)
+            }
+        }
+    };
+}
+
+// create eq_dyn, gt_dyn, wrappers etc
+make_dyn_comp_op!(eq);
+make_dyn_comp_op!(gt);
+make_dyn_comp_op!(gt_eq);
+make_dyn_comp_op!(lt);
+make_dyn_comp_op!(lt_eq);
+make_dyn_comp_op!(neq);
diff --git a/datafusion/physical-expr/src/expressions/binary/kernels.rs b/datafusion/physical-expr/src/expressions/binary/kernels.rs
new file mode 100644
index 000000000..e234815af
--- /dev/null
+++ b/datafusion/physical-expr/src/expressions/binary/kernels.rs
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains computation kernels that are specific to
+//! datafusion and not (yet) targeted to  port upstream to arrow
+use arrow::array::*;
+use arrow::datatypes::DataType;
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Operator;
+use std::sync::Arc;
+
+/// The binary_bitwise_array_op macro only evaluates for integer types
+/// like int64, int32.
+/// It is used to do bitwise operation.
+macro_rules! binary_bitwise_array_op {
+    ($LEFT:expr, $RIGHT:expr, $OP:tt, $ARRAY_TYPE:ident, $TYPE:ty) => {{
+        let len = $LEFT.len();
+        let left = $LEFT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
+        let right = $RIGHT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
+        let result = (0..len)
+            .into_iter()
+            .map(|i| {
+                if left.is_null(i) || right.is_null(i) {
+                    None
+                } else {
+                    Some(left.value(i) $OP right.value(i))
+                }
+            })
+            .collect::<$ARRAY_TYPE>();
+        Ok(Arc::new(result))
+    }};
+}
+
+/// The binary_bitwise_array_op macro only evaluates for integer types
+/// like int64, int32.
+/// It is used to do bitwise operation on an array with a scalar.
+macro_rules! binary_bitwise_array_scalar {
+    ($LEFT:expr, $RIGHT:expr, $OP:tt, $ARRAY_TYPE:ident, $TYPE:ty) => {{
+        let len = $LEFT.len();
+        let array = $LEFT.as_any().downcast_ref::<$ARRAY_TYPE>().unwrap();
+        let scalar = $RIGHT;
+        if scalar.is_null() {
+            Ok(new_null_array(array.data_type(), len))
+        } else {
+            let right: $TYPE = scalar.try_into().unwrap();
+            let result = (0..len)
+                .into_iter()
+                .map(|i| {
+                    if array.is_null(i) {
+                        None
+                    } else {
+                        Some(array.value(i) $OP right)
+                    }
+                })
+                .collect::<$ARRAY_TYPE>();
+            Ok(Arc::new(result) as ArrayRef)
+        }
+    }};
+}
+
+pub(crate) fn bitwise_and(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
+    match &left.data_type() {
+        DataType::Int8 => {
+            binary_bitwise_array_op!(left, right, &, Int8Array, i8)
+        }
+        DataType::Int16 => {
+            binary_bitwise_array_op!(left, right, &, Int16Array, i16)
+        }
+        DataType::Int32 => {
+            binary_bitwise_array_op!(left, right, &, Int32Array, i32)
+        }
+        DataType::Int64 => {
+            binary_bitwise_array_op!(left, right, &, Int64Array, i64)
+        }
+        other => Err(DataFusionError::Internal(format!(
+            "Data type {:?} not supported for binary operation '{}' on dyn arrays",
+            other,
+            Operator::BitwiseAnd
+        ))),
+    }
+}
+
+pub(crate) fn bitwise_or(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
+    match &left.data_type() {
+        DataType::Int8 => {
+            binary_bitwise_array_op!(left, right, |, Int8Array, i8)
+        }
+        DataType::Int16 => {
+            binary_bitwise_array_op!(left, right, |, Int16Array, i16)
+        }
+        DataType::Int32 => {
+            binary_bitwise_array_op!(left, right, |, Int32Array, i32)
+        }
+        DataType::Int64 => {
+            binary_bitwise_array_op!(left, right, |, Int64Array, i64)
+        }
+        other => Err(DataFusionError::Internal(format!(
+            "Data type {:?} not supported for binary operation '{}' on dyn arrays",
+            other,
+            Operator::BitwiseOr
+        ))),
+    }
+}
+
+pub(crate) fn bitwise_and_scalar(
+    array: &dyn Array,
+    scalar: ScalarValue,
+) -> Option<Result<ArrayRef>> {
+    let result = match array.data_type() {
+        DataType::Int8 => {
+            binary_bitwise_array_scalar!(array, scalar, &, Int8Array, i8)
+        }
+        DataType::Int16 => {
+            binary_bitwise_array_scalar!(array, scalar, &, Int16Array, i16)
+        }
+        DataType::Int32 => {
+            binary_bitwise_array_scalar!(array, scalar, &, Int32Array, i32)
+        }
+        DataType::Int64 => {
+            binary_bitwise_array_scalar!(array, scalar, &, Int64Array, i64)
+        }
+        other => Err(DataFusionError::Internal(format!(
+            "Data type {:?} not supported for binary operation '{}' on dyn arrays",
+            other,
+            Operator::BitwiseAnd
+        ))),
+    };
+    Some(result)
+}
+
+pub(crate) fn bitwise_or_scalar(
+    array: &dyn Array,
+    scalar: ScalarValue,
+) -> Option<Result<ArrayRef>> {
+    let result = match array.data_type() {
+        DataType::Int8 => {
+            binary_bitwise_array_scalar!(array, scalar, |, Int8Array, i8)
+        }
+        DataType::Int16 => {
+            binary_bitwise_array_scalar!(array, scalar, |, Int16Array, i16)
+        }
+        DataType::Int32 => {
+            binary_bitwise_array_scalar!(array, scalar, |, Int32Array, i32)
+        }
+        DataType::Int64 => {
+            binary_bitwise_array_scalar!(array, scalar, |, Int64Array, i64)
+        }
+        other => Err(DataFusionError::Internal(format!(
+            "Data type {:?} not supported for binary operation '{}' on dyn arrays",
+            other,
+            Operator::BitwiseOr
+        ))),
+    };
+    Some(result)
+}
+
+/// Concat lhs and rhs String Array, any `NULL` exists on lhs or rhs will come out result `NULL`
+/// 1. 'a' || 'b' || 32 = 'ab32'
+/// 2. 'a' || NULL = NULL
+pub(crate) fn string_concat(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
+    let left_array = left.as_any().downcast_ref::<StringArray>().unwrap();
+    let right_array = right.as_any().downcast_ref::<StringArray>().unwrap();
+    let result = (0..left.len())
+        .into_iter()
+        .map(|i| {
+            if left.is_null(i) || right.is_null(i) {
+                None
+            } else {
+                let mut owned_string: String = "".to_owned();
+                owned_string.push_str(left_array.value(i));
+                owned_string.push_str(right_array.value(i));
+                Some(owned_string)
+            }
+        })
+        .collect::<StringArray>();
+    Ok(Arc::new(result) as ArrayRef)
+}
diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
new file mode 100644
index 000000000..ba8fff716
--- /dev/null
+++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
@@ -0,0 +1,647 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains computation kernels that are eventually
+//! destined for arrow-rs but are in datafusion until they are ported.
+
+use arrow::error::ArrowError;
+use arrow::{array::*, datatypes::ArrowNumericType};
+use datafusion_common::{DataFusionError, Result};
+
+// Simple (low performance) kernels until optimized kernels are added to arrow
+// See https://github.com/apache/arrow-rs/issues/960
+
+pub(crate) fn is_distinct_from_bool(
+    left: &BooleanArray,
+    right: &BooleanArray,
+) -> Result<BooleanArray> {
+    // Different from `neq_bool` because `null is distinct from null` is false and not null
+    Ok(left
+        .iter()
+        .zip(right.iter())
+        .map(|(left, right)| Some(left != right))
+        .collect())
+}
+
+pub(crate) fn is_not_distinct_from_bool(
+    left: &BooleanArray,
+    right: &BooleanArray,
+) -> Result<BooleanArray> {
+    Ok(left
+        .iter()
+        .zip(right.iter())
+        .map(|(left, right)| Some(left == right))
+        .collect())
+}
+
+pub(crate) fn is_distinct_from<T>(
+    left: &PrimitiveArray<T>,
+    right: &PrimitiveArray<T>,
+) -> Result<BooleanArray>
+where
+    T: ArrowNumericType,
+{
+    Ok(left
+        .iter()
+        .zip(right.iter())
+        .map(|(x, y)| Some(x != y))
+        .collect())
+}
+
+pub(crate) fn is_distinct_from_utf8<OffsetSize: OffsetSizeTrait>(
+    left: &GenericStringArray<OffsetSize>,
+    right: &GenericStringArray<OffsetSize>,
+) -> Result<BooleanArray> {
+    Ok(left
+        .iter()
+        .zip(right.iter())
+        .map(|(x, y)| Some(x != y))
+        .collect())
+}
+
+pub(crate) fn is_distinct_from_null(
+    left: &NullArray,
+    _right: &NullArray,
+) -> Result<BooleanArray> {
+    let length = left.len();
+    make_boolean_array(length, false)
+}
+
+pub(crate) fn is_not_distinct_from_null(
+    left: &NullArray,
+    _right: &NullArray,
+) -> Result<BooleanArray> {
+    let length = left.len();
+    make_boolean_array(length, true)
+}
+
+fn make_boolean_array(length: usize, value: bool) -> Result<BooleanArray> {
+    Ok((0..length).into_iter().map(|_| Some(value)).collect())
+}
+
+pub(crate) fn is_not_distinct_from<T>(
+    left: &PrimitiveArray<T>,
+    right: &PrimitiveArray<T>,
+) -> Result<BooleanArray>
+where
+    T: ArrowNumericType,
+{
+    Ok(left
+        .iter()
+        .zip(right.iter())
+        .map(|(x, y)| Some(x == y))
+        .collect())
+}
+
+pub(crate) fn is_not_distinct_from_utf8<OffsetSize: OffsetSizeTrait>(
+    left: &GenericStringArray<OffsetSize>,
+    right: &GenericStringArray<OffsetSize>,
+) -> Result<BooleanArray> {
+    Ok(left
+        .iter()
+        .zip(right.iter())
+        .map(|(x, y)| Some(x == y))
+        .collect())
+}
+
+// TODO move decimal kernels to to arrow-rs
+// https://github.com/apache/arrow-rs/issues/1200
+
+/// Creates an BooleanArray the same size as `left`,
+/// applying `op` to all non-null elements of left
+pub(crate) fn compare_decimal_scalar<F>(
+    left: &Decimal128Array,
+    right: i128,
+    op: F,
+) -> Result<BooleanArray>
+where
+    F: Fn(i128, i128) -> bool,
+{
+    Ok(left
+        .iter()
+        .map(|left| left.map(|left| op(left, right)))
+        .collect())
+}
+
+/// Creates an BooleanArray the same size as `left`,
+/// by applying `op` to all non-null elements of left and right
+pub(crate) fn compare_decimal<F>(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+    op: F,
+) -> Result<BooleanArray>
+where
+    F: Fn(i128, i128) -> bool,
+{
+    Ok(left
+        .iter()
+        .zip(right.iter())
+        .map(|(left, right)| {
+            if let (Some(left), Some(right)) = (left, right) {
+                Some(op(left, right))
+            } else {
+                None
+            }
+        })
+        .collect())
+}
+
+pub(crate) fn eq_decimal_scalar(
+    left: &Decimal128Array,
+    right: i128,
+) -> Result<BooleanArray> {
+    compare_decimal_scalar(left, right, |left, right| left == right)
+}
+
+pub(crate) fn eq_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<BooleanArray> {
+    compare_decimal(left, right, |left, right| left == right)
+}
+
+pub(crate) fn neq_decimal_scalar(
+    left: &Decimal128Array,
+    right: i128,
+) -> Result<BooleanArray> {
+    compare_decimal_scalar(left, right, |left, right| left != right)
+}
+
+pub(crate) fn neq_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<BooleanArray> {
+    compare_decimal(left, right, |left, right| left != right)
+}
+
+pub(crate) fn lt_decimal_scalar(
+    left: &Decimal128Array,
+    right: i128,
+) -> Result<BooleanArray> {
+    compare_decimal_scalar(left, right, |left, right| left < right)
+}
+
+pub(crate) fn lt_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<BooleanArray> {
+    compare_decimal(left, right, |left, right| left < right)
+}
+
+pub(crate) fn lt_eq_decimal_scalar(
+    left: &Decimal128Array,
+    right: i128,
+) -> Result<BooleanArray> {
+    compare_decimal_scalar(left, right, |left, right| left <= right)
+}
+
+pub(crate) fn lt_eq_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<BooleanArray> {
+    compare_decimal(left, right, |left, right| left <= right)
+}
+
+pub(crate) fn gt_decimal_scalar(
+    left: &Decimal128Array,
+    right: i128,
+) -> Result<BooleanArray> {
+    compare_decimal_scalar(left, right, |left, right| left > right)
+}
+
+pub(crate) fn gt_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<BooleanArray> {
+    compare_decimal(left, right, |left, right| left > right)
+}
+
+pub(crate) fn gt_eq_decimal_scalar(
+    left: &Decimal128Array,
+    right: i128,
+) -> Result<BooleanArray> {
+    compare_decimal_scalar(left, right, |left, right| left >= right)
+}
+
+pub(crate) fn gt_eq_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<BooleanArray> {
+    compare_decimal(left, right, |left, right| left >= right)
+}
+
+pub(crate) fn is_distinct_from_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<BooleanArray> {
+    Ok(left
+        .iter()
+        .zip(right.iter())
+        .map(|(left, right)| match (left, right) {
+            (None, None) => Some(false),
+            (None, Some(_)) | (Some(_), None) => Some(true),
+            (Some(left), Some(right)) => Some(left != right),
+        })
+        .collect())
+}
+
+pub(crate) fn is_not_distinct_from_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<BooleanArray> {
+    Ok(left
+        .iter()
+        .zip(right.iter())
+        .map(|(left, right)| match (left, right) {
+            (None, None) => Some(true),
+            (None, Some(_)) | (Some(_), None) => Some(false),
+            (Some(left), Some(right)) => Some(left == right),
+        })
+        .collect())
+}
+
+/// Creates an Decimal128Array the same size as `left`,
+/// by applying `op` to all non-null elements of left and right
+pub(crate) fn arith_decimal<F>(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+    op: F,
+) -> Result<Decimal128Array>
+where
+    F: Fn(i128, i128) -> Result<i128>,
+{
+    left.iter()
+        .zip(right.iter())
+        .map(|(left, right)| {
+            if let (Some(left), Some(right)) = (left, right) {
+                Some(op(left, right)).transpose()
+            } else {
+                Ok(None)
+            }
+        })
+        .collect()
+}
+
+pub(crate) fn arith_decimal_scalar<F>(
+    left: &Decimal128Array,
+    right: i128,
+    op: F,
+) -> Result<Decimal128Array>
+where
+    F: Fn(i128, i128) -> Result<i128>,
+{
+    left.iter()
+        .map(|left| {
+            if let Some(left) = left {
+                Some(op(left, right)).transpose()
+            } else {
+                Ok(None)
+            }
+        })
+        .collect()
+}
+
+pub(crate) fn add_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<Decimal128Array> {
+    let array = arith_decimal(left, right, |left, right| Ok(left + right))?
+        .with_precision_and_scale(left.precision(), left.scale())?;
+    Ok(array)
+}
+
+pub(crate) fn add_decimal_scalar(
+    left: &Decimal128Array,
+    right: i128,
+) -> Result<Decimal128Array> {
+    let array = arith_decimal_scalar(left, right, |left, right| Ok(left + right))?
+        .with_precision_and_scale(left.precision(), left.scale())?;
+    Ok(array)
+}
+
+pub(crate) fn subtract_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<Decimal128Array> {
+    let array = arith_decimal(left, right, |left, right| Ok(left - right))?
+        .with_precision_and_scale(left.precision(), left.scale())?;
+    Ok(array)
+}
+
+pub(crate) fn subtract_decimal_scalar(
+    left: &Decimal128Array,
+    right: i128,
+) -> Result<Decimal128Array> {
+    let array = arith_decimal_scalar(left, right, |left, right| Ok(left - right))?
+        .with_precision_and_scale(left.precision(), left.scale())?;
+    Ok(array)
+}
+
+pub(crate) fn multiply_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<Decimal128Array> {
+    let divide = 10_i128.pow(left.scale() as u32);
+    let array = arith_decimal(left, right, |left, right| Ok(left * right / divide))?
+        .with_precision_and_scale(left.precision(), left.scale())?;
+    Ok(array)
+}
+
+pub(crate) fn multiply_decimal_scalar(
+    left: &Decimal128Array,
+    right: i128,
+) -> Result<Decimal128Array> {
+    let divide = 10_i128.pow(left.scale() as u32);
+    let array =
+        arith_decimal_scalar(left, right, |left, right| Ok(left * right / divide))?
+            .with_precision_and_scale(left.precision(), left.scale())?;
+    Ok(array)
+}
+
+pub(crate) fn divide_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<Decimal128Array> {
+    let mul = 10_f64.powi(left.scale() as i32);
+    let array = arith_decimal(left, right, |left, right| {
+        let l_value = left as f64;
+        let r_value = right as f64;
+        let result = ((l_value / r_value) * mul) as i128;
+        Ok(result)
+    })?
+    .with_precision_and_scale(left.precision(), left.scale())?;
+    Ok(array)
+}
+
+pub(crate) fn divide_decimal_scalar(
+    left: &Decimal128Array,
+    right: i128,
+) -> Result<Decimal128Array> {
+    let mul = 10_f64.powi(left.scale() as i32);
+    let array = arith_decimal_scalar(left, right, |left, right| {
+        let l_value = left as f64;
+        let r_value = right as f64;
+        let result = ((l_value / r_value) * mul) as i128;
+        Ok(result)
+    })?
+    .with_precision_and_scale(left.precision(), left.scale())?;
+    Ok(array)
+}
+
+pub(crate) fn modulus_decimal(
+    left: &Decimal128Array,
+    right: &Decimal128Array,
+) -> Result<Decimal128Array> {
+    let array = arith_decimal(left, right, |left, right| {
+        if right == 0 {
+            Err(DataFusionError::ArrowError(ArrowError::DivideByZero))
+        } else {
+            Ok(left % right)
+        }
+    })?
+    .with_precision_and_scale(left.precision(), left.scale())?;
+    Ok(array)
+}
+
+pub(crate) fn modulus_decimal_scalar(
+    left: &Decimal128Array,
+    right: i128,
+) -> Result<Decimal128Array> {
+    if right == 0 {
+        return Err(DataFusionError::ArrowError(ArrowError::DivideByZero));
+    }
+    let array = arith_decimal_scalar(left, right, |left, right| Ok(left % right))?
+        .with_precision_and_scale(left.precision(), left.scale())?;
+    Ok(array)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn create_decimal_array(
+        array: &[Option<i128>],
+        precision: usize,
+        scale: usize,
+    ) -> Decimal128Array {
+        let mut decimal_builder = Decimal128Builder::new(array.len(), precision, scale);
+        for value in array {
+            match value {
+                None => {
+                    decimal_builder.append_null();
+                }
+                Some(v) => {
+                    decimal_builder.append_value(*v).expect("valid value");
+                }
+            }
+        }
+        decimal_builder.finish()
+    }
+
+    #[test]
+    fn comparison_decimal_op_test() -> Result<()> {
+        let value_i128: i128 = 123;
+        let decimal_array = create_decimal_array(
+            &[
+                Some(value_i128),
+                None,
+                Some(value_i128 - 1),
+                Some(value_i128 + 1),
+            ],
+            25,
+            3,
+        );
+        // eq: array = i128
+        let result = eq_decimal_scalar(&decimal_array, value_i128)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
+            result
+        );
+        // neq: array != i128
+        let result = neq_decimal_scalar(&decimal_array, value_i128)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
+            result
+        );
+        // lt: array < i128
+        let result = lt_decimal_scalar(&decimal_array, value_i128)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
+            result
+        );
+        // lt_eq: array <= i128
+        let result = lt_eq_decimal_scalar(&decimal_array, value_i128)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
+            result
+        );
+        // gt: array > i128
+        let result = gt_decimal_scalar(&decimal_array, value_i128)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
+            result
+        );
+        // gt_eq: array >= i128
+        let result = gt_eq_decimal_scalar(&decimal_array, value_i128)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
+            result
+        );
+
+        let left_decimal_array = decimal_array;
+        let right_decimal_array = create_decimal_array(
+            &[
+                Some(value_i128 - 1),
+                Some(value_i128),
+                Some(value_i128 + 1),
+                Some(value_i128 + 1),
+            ],
+            25,
+            3,
+        );
+        // eq: left == right
+        let result = eq_decimal(&left_decimal_array, &right_decimal_array)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
+            result
+        );
+        // neq: left != right
+        let result = neq_decimal(&left_decimal_array, &right_decimal_array)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
+            result
+        );
+        // lt: left < right
+        let result = lt_decimal(&left_decimal_array, &right_decimal_array)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
+            result
+        );
+        // lt_eq: left <= right
+        let result = lt_eq_decimal(&left_decimal_array, &right_decimal_array)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
+            result
+        );
+        // gt: left > right
+        let result = gt_decimal(&left_decimal_array, &right_decimal_array)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
+            result
+        );
+        // gt_eq: left >= right
+        let result = gt_eq_decimal(&left_decimal_array, &right_decimal_array)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
+            result
+        );
+        // is_distinct: left distinct right
+        let result = is_distinct_from_decimal(&left_decimal_array, &right_decimal_array)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(true), Some(true), Some(true), Some(false)]),
+            result
+        );
+        // is_distinct: left distinct right
+        let result =
+            is_not_distinct_from_decimal(&left_decimal_array, &right_decimal_array)?;
+        assert_eq!(
+            BooleanArray::from(vec![Some(false), Some(false), Some(false), Some(true)]),
+            result
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn arithmetic_decimal_op_test() -> Result<()> {
+        let value_i128: i128 = 123;
+        let left_decimal_array = create_decimal_array(
+            &[
+                Some(value_i128),
+                None,
+                Some(value_i128 - 1),
+                Some(value_i128 + 1),
+            ],
+            25,
+            3,
+        );
+        let right_decimal_array = create_decimal_array(
+            &[
+                Some(value_i128),
+                Some(value_i128),
+                Some(value_i128),
+                Some(value_i128),
+            ],
+            25,
+            3,
+        );
+        // add
+        let result = add_decimal(&left_decimal_array, &right_decimal_array)?;
+        let expect =
+            create_decimal_array(&[Some(246), None, Some(245), Some(247)], 25, 3);
+        assert_eq!(expect, result);
+        let result = add_decimal_scalar(&left_decimal_array, 10)?;
+        let expect =
+            create_decimal_array(&[Some(133), None, Some(132), Some(134)], 25, 3);
+        assert_eq!(expect, result);
+        // subtract
+        let result = subtract_decimal(&left_decimal_array, &right_decimal_array)?;
+        let expect = create_decimal_array(&[Some(0), None, Some(-1), Some(1)], 25, 3);
+        assert_eq!(expect, result);
+        let result = subtract_decimal_scalar(&left_decimal_array, 10)?;
+        let expect =
+            create_decimal_array(&[Some(113), None, Some(112), Some(114)], 25, 3);
+        assert_eq!(expect, result);
+        // multiply
+        let result = multiply_decimal(&left_decimal_array, &right_decimal_array)?;
+        let expect = create_decimal_array(&[Some(15), None, Some(15), Some(15)], 25, 3);
+        assert_eq!(expect, result);
+        let result = multiply_decimal_scalar(&left_decimal_array, 10)?;
+        let expect = create_decimal_array(&[Some(1), None, Some(1), Some(1)], 25, 3);
+        assert_eq!(expect, result);
+        // divide
+        let left_decimal_array = create_decimal_array(
+            &[Some(1234567), None, Some(1234567), Some(1234567)],
+            25,
+            3,
+        );
+        let right_decimal_array =
+            create_decimal_array(&[Some(10), Some(100), Some(55), Some(-123)], 25, 3);
+        let result = divide_decimal(&left_decimal_array, &right_decimal_array)?;
+        let expect = create_decimal_array(
+            &[Some(123456700), None, Some(22446672), Some(-10037130)],
+            25,
+            3,
+        );
+        assert_eq!(expect, result);
+        let result = divide_decimal_scalar(&left_decimal_array, 10)?;
+        let expect = create_decimal_array(
+            &[Some(123456700), None, Some(123456700), Some(123456700)],
+            25,
+            3,
+        );
+        assert_eq!(expect, result);
+        // modulus
+        let result = modulus_decimal(&left_decimal_array, &right_decimal_array)?;
+        let expect = create_decimal_array(&[Some(7), None, Some(37), Some(16)], 25, 3);
+        assert_eq!(expect, result);
+        let result = modulus_decimal_scalar(&left_decimal_array, 10)?;
+        let expect = create_decimal_array(&[Some(7), None, Some(7), Some(7)], 25, 3);
+        assert_eq!(expect, result);
+
+        Ok(())
+    }
+}