You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/04/24 08:13:52 UTC

[arrow-datafusion] branch main updated: Remove temporal to kernels_arrow (#6069)

This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 655f2a84d8 Remove temporal to kernels_arrow (#6069)
655f2a84d8 is described below

commit 655f2a84d86a45267ecdfde1860d3748309f270f
Author: Berkay Şahin <12...@users.noreply.github.com>
AuthorDate: Mon Apr 24 11:13:46 2023 +0300

    Remove temporal to kernels_arrow (#6069)
    
    * ready to review
    
    * clippy fix
    
    * clippy fix
    
    * Simple code refactor
    
    * fix clippy
    
    * Update datafusion/physical-expr/src/expressions/datetime.rs
    
    Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    
    ---------
    
    Co-authored-by: metesynnada <10...@users.noreply.github.com>
    Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 datafusion/physical-expr/src/expressions/binary.rs |  52 +++++++--
 .../src/expressions/binary/kernels_arrow.rs        | 123 ++++++++++++++++++++-
 .../physical-expr/src/expressions/datetime.rs      | 121 ++++----------------
 3 files changed, 181 insertions(+), 115 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index b98a2dc420..20d1d9e0d9 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -68,14 +68,15 @@ use kernels::{
     bitwise_xor, bitwise_xor_scalar,
 };
 use kernels_arrow::{
-    add_decimal_dyn_scalar, add_dyn_decimal, divide_decimal_dyn_scalar,
-    divide_dyn_opt_decimal, is_distinct_from, is_distinct_from_bool,
-    is_distinct_from_decimal, is_distinct_from_f32, is_distinct_from_f64,
-    is_distinct_from_null, is_distinct_from_utf8, is_not_distinct_from,
-    is_not_distinct_from_bool, is_not_distinct_from_decimal, is_not_distinct_from_f32,
-    is_not_distinct_from_f64, is_not_distinct_from_null, is_not_distinct_from_utf8,
-    modulus_decimal_dyn_scalar, modulus_dyn_decimal, multiply_decimal_dyn_scalar,
-    multiply_dyn_decimal, subtract_decimal_dyn_scalar, subtract_dyn_decimal,
+    add_decimal_dyn_scalar, add_dyn_decimal, add_dyn_temporal, add_dyn_temporal_scalar,
+    divide_decimal_dyn_scalar, divide_dyn_opt_decimal, is_distinct_from,
+    is_distinct_from_bool, is_distinct_from_decimal, is_distinct_from_f32,
+    is_distinct_from_f64, is_distinct_from_null, is_distinct_from_utf8,
+    is_not_distinct_from, is_not_distinct_from_bool, is_not_distinct_from_decimal,
+    is_not_distinct_from_f32, is_not_distinct_from_f64, is_not_distinct_from_null,
+    is_not_distinct_from_utf8, modulus_decimal_dyn_scalar, modulus_dyn_decimal,
+    multiply_decimal_dyn_scalar, multiply_dyn_decimal, subtract_decimal_dyn_scalar,
+    subtract_dyn_decimal, subtract_dyn_temporal, subtract_dyn_temporal_scalar,
 };
 
 use arrow::datatypes::{DataType, Schema, TimeUnit};
@@ -1312,10 +1313,39 @@ macro_rules! sub_timestamp_macro {
         Arc::new(ret) as ArrayRef
     }};
 }
+
+pub fn resolve_temporal_op(
+    lhs: &ArrayRef,
+    sign: i32,
+    rhs: &ArrayRef,
+) -> Result<ArrayRef> {
+    match sign {
+        1 => add_dyn_temporal(lhs, rhs),
+        -1 => subtract_dyn_temporal(lhs, rhs),
+        other => Err(DataFusionError::Internal(format!(
+            "Undefined operation for temporal types {other}"
+        ))),
+    }
+}
+
+pub fn resolve_temporal_op_scalar(
+    lhs: &ArrayRef,
+    sign: i32,
+    rhs: &ScalarValue,
+) -> Result<ColumnarValue> {
+    match sign {
+        1 => add_dyn_temporal_scalar(lhs, rhs),
+        -1 => subtract_dyn_temporal_scalar(lhs, rhs),
+        other => Err(DataFusionError::Internal(format!(
+            "Undefined operation for temporal types {other}"
+        ))),
+    }
+}
+
 /// This function handles the Timestamp - Timestamp operations,
 /// where the first one is an array, and the second one is a scalar,
 /// hence the result is also an array.
-pub fn ts_scalar_ts_op(array: ArrayRef, scalar: &ScalarValue) -> Result<ColumnarValue> {
+pub fn ts_scalar_ts_op(array: &ArrayRef, scalar: &ScalarValue) -> Result<ColumnarValue> {
     let ret = match (array.data_type(), scalar) {
         (
             DataType::Timestamp(TimeUnit::Second, opt_tz_lhs),
@@ -1410,7 +1440,7 @@ macro_rules! sub_timestamp_interval_macro {
 /// where the first one is an array, and the second one is a scalar,
 /// hence the result is also an array.
 pub fn ts_scalar_interval_op(
-    array: ArrayRef,
+    array: &ArrayRef,
     sign: i32,
     scalar: &ScalarValue,
 ) -> Result<ColumnarValue> {
@@ -1494,7 +1524,7 @@ macro_rules! sub_interval_cross_macro {
 /// where the first one is an array, and the second one is a scalar,
 /// hence the result is also an interval array.
 pub fn interval_scalar_interval_op(
-    array: ArrayRef,
+    array: &ArrayRef,
     sign: i32,
     scalar: &ScalarValue,
 ) -> Result<ColumnarValue> {
diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
index 8998738ba5..3b93d6f792 100644
--- a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
+++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
@@ -21,17 +21,24 @@
 use arrow::compute::{
     add_dyn, add_scalar_dyn, divide_dyn_opt, divide_scalar_dyn, modulus_dyn,
     modulus_scalar_dyn, multiply_dyn, multiply_scalar_dyn, subtract_dyn,
-    subtract_scalar_dyn,
+    subtract_scalar_dyn, try_unary,
 };
-use arrow::datatypes::Decimal128Type;
+use arrow::datatypes::{Date32Type, Date64Type, Decimal128Type};
 use arrow::{array::*, datatypes::ArrowNumericType, downcast_dictionary_array};
 use arrow_schema::DataType;
-use datafusion_common::cast::as_decimal128_array;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::cast::{as_date32_array, as_date64_array, as_decimal128_array};
+use datafusion_common::scalar::{date32_add, date64_add};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::type_coercion::binary::decimal_op_mathematics_type;
+use datafusion_expr::ColumnarValue;
 use datafusion_expr::Operator;
 use std::sync::Arc;
 
+use super::{
+    interval_array_op, interval_scalar_interval_op, ts_array_op, ts_interval_array_op,
+    ts_scalar_interval_op, ts_scalar_ts_op,
+};
+
 // Simple (low performance) kernels until optimized kernels are added to arrow
 // See https://github.com/apache/arrow-rs/issues/960
 
@@ -280,6 +287,57 @@ pub(crate) fn add_decimal_dyn_scalar(
     decimal_array_with_precision_scale(array, precision, scale)
 }
 
+pub(crate) fn add_dyn_temporal(left: &ArrayRef, right: &ArrayRef) -> Result<ArrayRef> {
+    match (left.data_type(), right.data_type()) {
+        (DataType::Timestamp(..), DataType::Timestamp(..)) => ts_array_op(left, right),
+        (DataType::Interval(..), DataType::Interval(..)) => {
+            interval_array_op(left, right, 1)
+        }
+        (DataType::Timestamp(..), DataType::Interval(..)) => {
+            ts_interval_array_op(left, 1, right)
+        }
+        (DataType::Interval(..), DataType::Timestamp(..)) => {
+            ts_interval_array_op(right, 1, left)
+        }
+        _ => {
+            // fall back to kernels in arrow-rs
+            Ok(add_dyn(left, right)?)
+        }
+    }
+}
+
+pub(crate) fn add_dyn_temporal_scalar(
+    left: &ArrayRef,
+    right: &ScalarValue,
+) -> Result<ColumnarValue> {
+    match (left.data_type(), right.get_datatype()) {
+        (DataType::Date32, DataType::Interval(..)) => {
+            let left = as_date32_array(&left)?;
+            let ret = Arc::new(try_unary::<Date32Type, _, Date32Type>(left, |days| {
+                Ok(date32_add(days, right, 1)?)
+            })?) as ArrayRef;
+            Ok(ColumnarValue::Array(ret))
+        }
+        (DataType::Date64, DataType::Interval(..)) => {
+            let left = as_date64_array(&left)?;
+            let ret = Arc::new(try_unary::<Date64Type, _, Date64Type>(left, |ms| {
+                Ok(date64_add(ms, right, 1)?)
+            })?) as ArrayRef;
+            Ok(ColumnarValue::Array(ret))
+        }
+        (DataType::Interval(..), DataType::Interval(..)) => {
+            interval_scalar_interval_op(left, 1, right)
+        }
+        (DataType::Timestamp(..), DataType::Interval(..)) => {
+            ts_scalar_interval_op(left, 1, right)
+        }
+        _ => {
+            // fall back to kernels in arrow-rs
+            Ok(ColumnarValue::Array(add_dyn(left, &right.to_array())?))
+        }
+    }
+}
+
 pub(crate) fn subtract_decimal_dyn_scalar(
     left: &dyn Array,
     right: i128,
@@ -291,6 +349,63 @@ pub(crate) fn subtract_decimal_dyn_scalar(
     decimal_array_with_precision_scale(array, precision, scale)
 }
 
+pub(crate) fn subtract_dyn_temporal(
+    left: &ArrayRef,
+    right: &ArrayRef,
+) -> Result<ArrayRef> {
+    match (left.data_type(), right.data_type()) {
+        (DataType::Timestamp(..), DataType::Timestamp(..)) => ts_array_op(left, right),
+        (DataType::Interval(..), DataType::Interval(..)) => {
+            interval_array_op(left, right, -1)
+        }
+        (DataType::Timestamp(..), DataType::Interval(..)) => {
+            ts_interval_array_op(left, -1, right)
+        }
+        (DataType::Interval(..), DataType::Timestamp(..)) => {
+            ts_interval_array_op(right, -1, left)
+        }
+        _ => {
+            // fall back to kernels in arrow-rs
+            Ok(subtract_dyn(left, right)?)
+        }
+    }
+}
+
+pub(crate) fn subtract_dyn_temporal_scalar(
+    left: &ArrayRef,
+    right: &ScalarValue,
+) -> Result<ColumnarValue> {
+    match (left.data_type(), right.get_datatype()) {
+        (DataType::Date32, DataType::Interval(..)) => {
+            let left = as_date32_array(&left)?;
+            let ret = Arc::new(try_unary::<Date32Type, _, Date32Type>(left, |days| {
+                Ok(date32_add(days, right, -1)?)
+            })?) as ArrayRef;
+            Ok(ColumnarValue::Array(ret))
+        }
+        (DataType::Date64, DataType::Interval(..)) => {
+            let left = as_date64_array(&left)?;
+            let ret = Arc::new(try_unary::<Date64Type, _, Date64Type>(left, |ms| {
+                Ok(date64_add(ms, right, -1)?)
+            })?) as ArrayRef;
+            Ok(ColumnarValue::Array(ret))
+        }
+        (DataType::Timestamp(..), DataType::Timestamp(..)) => {
+            ts_scalar_ts_op(left, right)
+        }
+        (DataType::Interval(..), DataType::Interval(..)) => {
+            interval_scalar_interval_op(left, -1, right)
+        }
+        (DataType::Timestamp(..), DataType::Interval(..)) => {
+            ts_scalar_interval_op(left, -1, right)
+        }
+        _ => {
+            // fall back to kernels in arrow-rs
+            Ok(ColumnarValue::Array(subtract_dyn(left, &right.to_array())?))
+        }
+    }
+}
+
 fn get_precision_scale(data_type: &DataType) -> Result<(u8, i8)> {
     match data_type {
         DataType::Decimal128(precision, scale) => Ok((*precision, *scale)),
diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs
index 4668745416..25f02be3b1 100644
--- a/datafusion/physical-expr/src/expressions/datetime.rs
+++ b/datafusion/physical-expr/src/expressions/datetime.rs
@@ -19,25 +19,17 @@ use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
 use crate::intervals::{apply_operator, Interval};
 use crate::physical_expr::down_cast_any_ref;
 use crate::PhysicalExpr;
-use arrow::array::{Array, ArrayRef};
-use arrow::compute::try_unary;
-use arrow::datatypes::{DataType, Date32Type, Date64Type, Schema};
+use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
 
-use datafusion_common::cast::*;
-use datafusion_common::scalar::*;
-use datafusion_common::Result;
-use datafusion_common::{DataFusionError, ScalarValue};
+use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::type_coercion::binary::coerce_types;
 use datafusion_expr::{ColumnarValue, Operator};
 use std::any::Any;
 use std::fmt::{Display, Formatter};
 use std::sync::Arc;
 
-use super::binary::{
-    interval_array_op, interval_scalar_interval_op, ts_array_op, ts_interval_array_op,
-    ts_scalar_interval_op, ts_scalar_ts_op,
-};
+use super::binary::{resolve_temporal_op, resolve_temporal_op_scalar};
 
 /// Perform DATE/TIME/TIMESTAMP +/ INTERVAL math
 #[derive(Debug)]
@@ -151,13 +143,18 @@ impl PhysicalExpr for DateTimeIntervalExpr {
                     operand_lhs.sub(&operand_rhs)?
                 }))
             }
-            (ColumnarValue::Array(array_lhs), ColumnarValue::Scalar(operand_rhs)) => {
-                evaluate_temporal_array(array_lhs, sign, &operand_rhs)
-            }
-
-            (ColumnarValue::Array(array_lhs), ColumnarValue::Array(array_rhs)) => {
-                evaluate_temporal_arrays(&array_lhs, sign, &array_rhs)
+            // This function evaluates temporal array vs scalar operations, such as timestamp - timestamp,
+            // interval + interval, timestamp + interval, and interval + timestamp. It takes one array and one scalar as input
+            // and an integer sign representing the operation (+1 for addition and -1 for subtraction).
+            (ColumnarValue::Array(array_lhs), ColumnarValue::Scalar(array_rhs)) => {
+                resolve_temporal_op_scalar(&array_lhs, sign, &array_rhs)
             }
+            // This function evaluates temporal array operations, such as timestamp - timestamp, interval + interval,
+            // timestamp + interval, and interval + timestamp. It takes two arrays as input and an integer sign representing
+            // the operation (+1 for addition and -1 for subtraction).
+            (ColumnarValue::Array(array_lhs), ColumnarValue::Array(array_rhs)) => Ok(
+                ColumnarValue::Array(resolve_temporal_op(&array_lhs, sign, &array_rhs)?),
+            ),
             (_, _) => {
                 let msg = "If RHS of the operation is an array, then LHS also must be";
                 Err(DataFusionError::Internal(msg.to_string()))
@@ -223,82 +220,6 @@ impl PartialEq<dyn Any> for DateTimeIntervalExpr {
     }
 }
 
-pub fn evaluate_temporal_array(
-    array: ArrayRef,
-    sign: i32,
-    scalar: &ScalarValue,
-) -> Result<ColumnarValue> {
-    match (array.data_type(), scalar.get_datatype()) {
-        // Date +- Interval
-        (DataType::Date32, DataType::Interval(_)) => {
-            let array = as_date32_array(&array)?;
-            let ret = Arc::new(try_unary::<Date32Type, _, Date32Type>(array, |days| {
-                Ok(date32_add(days, scalar, sign)?)
-            })?) as ArrayRef;
-            Ok(ColumnarValue::Array(ret))
-        }
-        (DataType::Date64, DataType::Interval(_)) => {
-            let array = as_date64_array(&array)?;
-            let ret = Arc::new(try_unary::<Date64Type, _, Date64Type>(array, |ms| {
-                Ok(date64_add(ms, scalar, sign)?)
-            })?) as ArrayRef;
-            Ok(ColumnarValue::Array(ret))
-        }
-        // Timestamp - Timestamp
-        (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) if sign == -1 => {
-            ts_scalar_ts_op(array, scalar)
-        }
-        // Interval +- Interval
-        (DataType::Interval(_), DataType::Interval(_)) => {
-            interval_scalar_interval_op(array, sign, scalar)
-        }
-        // Timestamp +- Interval
-        (DataType::Timestamp(_, _), DataType::Interval(_)) => {
-            ts_scalar_interval_op(array, sign, scalar)
-        }
-        (_, _) => Err(DataFusionError::Execution(format!(
-            "Invalid lhs type for DateIntervalExpr: {}",
-            array.data_type()
-        )))?,
-    }
-}
-
-// This function evaluates temporal array operations, such as timestamp - timestamp, interval + interval,
-// timestamp + interval, and interval + timestamp. It takes two arrays as input and an integer sign representing
-// the operation (+1 for addition and -1 for subtraction). It returns a ColumnarValue as output, which can hold
-// either a scalar or an array.
-pub fn evaluate_temporal_arrays(
-    array_lhs: &ArrayRef,
-    sign: i32,
-    array_rhs: &ArrayRef,
-) -> Result<ColumnarValue> {
-    let ret = match (array_lhs.data_type(), array_rhs.data_type()) {
-        // Timestamp - Timestamp operations, operands of only the same types are supported.
-        (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) => {
-            ts_array_op(array_lhs, array_rhs)?
-        }
-        // Interval (+ , -) Interval operations
-        (DataType::Interval(_), DataType::Interval(_)) => {
-            interval_array_op(array_lhs, array_rhs, sign)?
-        }
-        // Timestamp (+ , -) Interval and Interval + Timestamp operations
-        // Interval - Timestamp operation is not rational hence not supported
-        (DataType::Timestamp(_, _), DataType::Interval(_)) => {
-            ts_interval_array_op(array_lhs, sign, array_rhs)?
-        }
-        (DataType::Interval(_), DataType::Timestamp(_, _)) if sign == 1 => {
-            ts_interval_array_op(array_rhs, sign, array_lhs)?
-        }
-        (_, _) => Err(DataFusionError::Execution(format!(
-            "Invalid array types for DateIntervalExpr: {} {} {}",
-            array_lhs.data_type(),
-            sign,
-            array_rhs.data_type()
-        )))?,
-    };
-    Ok(ColumnarValue::Array(ret))
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -309,7 +230,7 @@ mod tests {
     use arrow_array::IntervalMonthDayNanoArray;
     use chrono::{Duration, NaiveDate};
     use datafusion_common::delta::shift_months;
-    use datafusion_common::{Column, Result, ToDFSchema};
+    use datafusion_common::{Column, Result, ScalarValue, ToDFSchema};
     use datafusion_expr::Expr;
     use std::ops::Add;
 
@@ -703,7 +624,7 @@ mod tests {
     }
 
     // In this test, ArrayRef of one element arrays is evaluated with some ScalarValues,
-    // aiming that evaluate_temporal_array function is working properly and shows the same
+    // aiming that resolve_temporal_op_scalar function is working properly and shows the same
     // behavior with ScalarValue arithmetic.
     fn experiment(
         timestamp_scalar: ScalarValue,
@@ -714,7 +635,7 @@ mod tests {
 
         // timestamp + interval
         if let ColumnarValue::Array(res1) =
-            evaluate_temporal_array(timestamp_array.clone(), 1, &interval_scalar)?
+            resolve_temporal_op_scalar(&timestamp_array, 1, &interval_scalar)?
         {
             let res2 = timestamp_scalar.add(&interval_scalar)?.to_array();
             assert_eq!(
@@ -725,7 +646,7 @@ mod tests {
 
         // timestamp - interval
         if let ColumnarValue::Array(res1) =
-            evaluate_temporal_array(timestamp_array.clone(), -1, &interval_scalar)?
+            resolve_temporal_op_scalar(&timestamp_array, -1, &interval_scalar)?
         {
             let res2 = timestamp_scalar.sub(&interval_scalar)?.to_array();
             assert_eq!(
@@ -736,7 +657,7 @@ mod tests {
 
         // timestamp - timestamp
         if let ColumnarValue::Array(res1) =
-            evaluate_temporal_array(timestamp_array.clone(), -1, &timestamp_scalar)?
+            resolve_temporal_op_scalar(&timestamp_array, -1, &timestamp_scalar)?
         {
             let res2 = timestamp_scalar.sub(&timestamp_scalar)?.to_array();
             assert_eq!(
@@ -747,7 +668,7 @@ mod tests {
 
         // interval - interval
         if let ColumnarValue::Array(res1) =
-            evaluate_temporal_array(interval_array.clone(), -1, &interval_scalar)?
+            resolve_temporal_op_scalar(&interval_array, -1, &interval_scalar)?
         {
             let res2 = interval_scalar.sub(&interval_scalar)?.to_array();
             assert_eq!(
@@ -758,7 +679,7 @@ mod tests {
 
         // interval + interval
         if let ColumnarValue::Array(res1) =
-            evaluate_temporal_array(interval_array, 1, &interval_scalar)?
+            resolve_temporal_op_scalar(&interval_array, 1, &interval_scalar)?
         {
             let res2 = interval_scalar.add(&interval_scalar)?.to_array();
             assert_eq!(