You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/04/24 08:13:52 UTC
[arrow-datafusion] branch main updated: Remove temporal to kernels_arrow (#6069)
This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 655f2a84d8 Remove temporal to kernels_arrow (#6069)
655f2a84d8 is described below
commit 655f2a84d86a45267ecdfde1860d3748309f270f
Author: Berkay Şahin <12...@users.noreply.github.com>
AuthorDate: Mon Apr 24 11:13:46 2023 +0300
Remove temporal to kernels_arrow (#6069)
* ready to review
* clippy fix
* clippy fix
* Simple code refactor
* fix clippy
* Update datafusion/physical-expr/src/expressions/datetime.rs
Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
---------
Co-authored-by: metesynnada <10...@users.noreply.github.com>
Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
---
datafusion/physical-expr/src/expressions/binary.rs | 52 +++++++--
.../src/expressions/binary/kernels_arrow.rs | 123 ++++++++++++++++++++-
.../physical-expr/src/expressions/datetime.rs | 121 ++++----------------
3 files changed, 181 insertions(+), 115 deletions(-)
diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index b98a2dc420..20d1d9e0d9 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -68,14 +68,15 @@ use kernels::{
bitwise_xor, bitwise_xor_scalar,
};
use kernels_arrow::{
- add_decimal_dyn_scalar, add_dyn_decimal, divide_decimal_dyn_scalar,
- divide_dyn_opt_decimal, is_distinct_from, is_distinct_from_bool,
- is_distinct_from_decimal, is_distinct_from_f32, is_distinct_from_f64,
- is_distinct_from_null, is_distinct_from_utf8, is_not_distinct_from,
- is_not_distinct_from_bool, is_not_distinct_from_decimal, is_not_distinct_from_f32,
- is_not_distinct_from_f64, is_not_distinct_from_null, is_not_distinct_from_utf8,
- modulus_decimal_dyn_scalar, modulus_dyn_decimal, multiply_decimal_dyn_scalar,
- multiply_dyn_decimal, subtract_decimal_dyn_scalar, subtract_dyn_decimal,
+ add_decimal_dyn_scalar, add_dyn_decimal, add_dyn_temporal, add_dyn_temporal_scalar,
+ divide_decimal_dyn_scalar, divide_dyn_opt_decimal, is_distinct_from,
+ is_distinct_from_bool, is_distinct_from_decimal, is_distinct_from_f32,
+ is_distinct_from_f64, is_distinct_from_null, is_distinct_from_utf8,
+ is_not_distinct_from, is_not_distinct_from_bool, is_not_distinct_from_decimal,
+ is_not_distinct_from_f32, is_not_distinct_from_f64, is_not_distinct_from_null,
+ is_not_distinct_from_utf8, modulus_decimal_dyn_scalar, modulus_dyn_decimal,
+ multiply_decimal_dyn_scalar, multiply_dyn_decimal, subtract_decimal_dyn_scalar,
+ subtract_dyn_decimal, subtract_dyn_temporal, subtract_dyn_temporal_scalar,
};
use arrow::datatypes::{DataType, Schema, TimeUnit};
@@ -1312,10 +1313,39 @@ macro_rules! sub_timestamp_macro {
Arc::new(ret) as ArrayRef
}};
}
+
+pub fn resolve_temporal_op(
+ lhs: &ArrayRef,
+ sign: i32,
+ rhs: &ArrayRef,
+) -> Result<ArrayRef> {
+ match sign {
+ 1 => add_dyn_temporal(lhs, rhs),
+ -1 => subtract_dyn_temporal(lhs, rhs),
+ other => Err(DataFusionError::Internal(format!(
+ "Undefined operation for temporal types {other}"
+ ))),
+ }
+}
+
+pub fn resolve_temporal_op_scalar(
+ lhs: &ArrayRef,
+ sign: i32,
+ rhs: &ScalarValue,
+) -> Result<ColumnarValue> {
+ match sign {
+ 1 => add_dyn_temporal_scalar(lhs, rhs),
+ -1 => subtract_dyn_temporal_scalar(lhs, rhs),
+ other => Err(DataFusionError::Internal(format!(
+ "Undefined operation for temporal types {other}"
+ ))),
+ }
+}
+
/// This function handles the Timestamp - Timestamp operations,
/// where the first one is an array, and the second one is a scalar,
/// hence the result is also an array.
-pub fn ts_scalar_ts_op(array: ArrayRef, scalar: &ScalarValue) -> Result<ColumnarValue> {
+pub fn ts_scalar_ts_op(array: &ArrayRef, scalar: &ScalarValue) -> Result<ColumnarValue> {
let ret = match (array.data_type(), scalar) {
(
DataType::Timestamp(TimeUnit::Second, opt_tz_lhs),
@@ -1410,7 +1440,7 @@ macro_rules! sub_timestamp_interval_macro {
/// where the first one is an array, and the second one is a scalar,
/// hence the result is also an array.
pub fn ts_scalar_interval_op(
- array: ArrayRef,
+ array: &ArrayRef,
sign: i32,
scalar: &ScalarValue,
) -> Result<ColumnarValue> {
@@ -1494,7 +1524,7 @@ macro_rules! sub_interval_cross_macro {
/// where the first one is an array, and the second one is a scalar,
/// hence the result is also an interval array.
pub fn interval_scalar_interval_op(
- array: ArrayRef,
+ array: &ArrayRef,
sign: i32,
scalar: &ScalarValue,
) -> Result<ColumnarValue> {
diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
index 8998738ba5..3b93d6f792 100644
--- a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
+++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
@@ -21,17 +21,24 @@
use arrow::compute::{
add_dyn, add_scalar_dyn, divide_dyn_opt, divide_scalar_dyn, modulus_dyn,
modulus_scalar_dyn, multiply_dyn, multiply_scalar_dyn, subtract_dyn,
- subtract_scalar_dyn,
+ subtract_scalar_dyn, try_unary,
};
-use arrow::datatypes::Decimal128Type;
+use arrow::datatypes::{Date32Type, Date64Type, Decimal128Type};
use arrow::{array::*, datatypes::ArrowNumericType, downcast_dictionary_array};
use arrow_schema::DataType;
-use datafusion_common::cast::as_decimal128_array;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::cast::{as_date32_array, as_date64_array, as_decimal128_array};
+use datafusion_common::scalar::{date32_add, date64_add};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::type_coercion::binary::decimal_op_mathematics_type;
+use datafusion_expr::ColumnarValue;
use datafusion_expr::Operator;
use std::sync::Arc;
+use super::{
+ interval_array_op, interval_scalar_interval_op, ts_array_op, ts_interval_array_op,
+ ts_scalar_interval_op, ts_scalar_ts_op,
+};
+
// Simple (low performance) kernels until optimized kernels are added to arrow
// See https://github.com/apache/arrow-rs/issues/960
@@ -280,6 +287,57 @@ pub(crate) fn add_decimal_dyn_scalar(
decimal_array_with_precision_scale(array, precision, scale)
}
+pub(crate) fn add_dyn_temporal(left: &ArrayRef, right: &ArrayRef) -> Result<ArrayRef> {
+ match (left.data_type(), right.data_type()) {
+ (DataType::Timestamp(..), DataType::Timestamp(..)) => ts_array_op(left, right),
+ (DataType::Interval(..), DataType::Interval(..)) => {
+ interval_array_op(left, right, 1)
+ }
+ (DataType::Timestamp(..), DataType::Interval(..)) => {
+ ts_interval_array_op(left, 1, right)
+ }
+ (DataType::Interval(..), DataType::Timestamp(..)) => {
+ ts_interval_array_op(right, 1, left)
+ }
+ _ => {
+ // fall back to kernels in arrow-rs
+ Ok(add_dyn(left, right)?)
+ }
+ }
+}
+
+pub(crate) fn add_dyn_temporal_scalar(
+ left: &ArrayRef,
+ right: &ScalarValue,
+) -> Result<ColumnarValue> {
+ match (left.data_type(), right.get_datatype()) {
+ (DataType::Date32, DataType::Interval(..)) => {
+ let left = as_date32_array(&left)?;
+ let ret = Arc::new(try_unary::<Date32Type, _, Date32Type>(left, |days| {
+ Ok(date32_add(days, right, 1)?)
+ })?) as ArrayRef;
+ Ok(ColumnarValue::Array(ret))
+ }
+ (DataType::Date64, DataType::Interval(..)) => {
+ let left = as_date64_array(&left)?;
+ let ret = Arc::new(try_unary::<Date64Type, _, Date64Type>(left, |ms| {
+ Ok(date64_add(ms, right, 1)?)
+ })?) as ArrayRef;
+ Ok(ColumnarValue::Array(ret))
+ }
+ (DataType::Interval(..), DataType::Interval(..)) => {
+ interval_scalar_interval_op(left, 1, right)
+ }
+ (DataType::Timestamp(..), DataType::Interval(..)) => {
+ ts_scalar_interval_op(left, 1, right)
+ }
+ _ => {
+ // fall back to kernels in arrow-rs
+ Ok(ColumnarValue::Array(add_dyn(left, &right.to_array())?))
+ }
+ }
+}
+
pub(crate) fn subtract_decimal_dyn_scalar(
left: &dyn Array,
right: i128,
@@ -291,6 +349,63 @@ pub(crate) fn subtract_decimal_dyn_scalar(
decimal_array_with_precision_scale(array, precision, scale)
}
+pub(crate) fn subtract_dyn_temporal(
+ left: &ArrayRef,
+ right: &ArrayRef,
+) -> Result<ArrayRef> {
+ match (left.data_type(), right.data_type()) {
+ (DataType::Timestamp(..), DataType::Timestamp(..)) => ts_array_op(left, right),
+ (DataType::Interval(..), DataType::Interval(..)) => {
+ interval_array_op(left, right, -1)
+ }
+ (DataType::Timestamp(..), DataType::Interval(..)) => {
+ ts_interval_array_op(left, -1, right)
+ }
+ (DataType::Interval(..), DataType::Timestamp(..)) => {
+ ts_interval_array_op(right, -1, left)
+ }
+ _ => {
+ // fall back to kernels in arrow-rs
+ Ok(subtract_dyn(left, right)?)
+ }
+ }
+}
+
+pub(crate) fn subtract_dyn_temporal_scalar(
+ left: &ArrayRef,
+ right: &ScalarValue,
+) -> Result<ColumnarValue> {
+ match (left.data_type(), right.get_datatype()) {
+ (DataType::Date32, DataType::Interval(..)) => {
+ let left = as_date32_array(&left)?;
+ let ret = Arc::new(try_unary::<Date32Type, _, Date32Type>(left, |days| {
+ Ok(date32_add(days, right, -1)?)
+ })?) as ArrayRef;
+ Ok(ColumnarValue::Array(ret))
+ }
+ (DataType::Date64, DataType::Interval(..)) => {
+ let left = as_date64_array(&left)?;
+ let ret = Arc::new(try_unary::<Date64Type, _, Date64Type>(left, |ms| {
+ Ok(date64_add(ms, right, -1)?)
+ })?) as ArrayRef;
+ Ok(ColumnarValue::Array(ret))
+ }
+ (DataType::Timestamp(..), DataType::Timestamp(..)) => {
+ ts_scalar_ts_op(left, right)
+ }
+ (DataType::Interval(..), DataType::Interval(..)) => {
+ interval_scalar_interval_op(left, -1, right)
+ }
+ (DataType::Timestamp(..), DataType::Interval(..)) => {
+ ts_scalar_interval_op(left, -1, right)
+ }
+ _ => {
+ // fall back to kernels in arrow-rs
+ Ok(ColumnarValue::Array(subtract_dyn(left, &right.to_array())?))
+ }
+ }
+}
+
fn get_precision_scale(data_type: &DataType) -> Result<(u8, i8)> {
match data_type {
DataType::Decimal128(precision, scale) => Ok((*precision, *scale)),
diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs
index 4668745416..25f02be3b1 100644
--- a/datafusion/physical-expr/src/expressions/datetime.rs
+++ b/datafusion/physical-expr/src/expressions/datetime.rs
@@ -19,25 +19,17 @@ use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
use crate::intervals::{apply_operator, Interval};
use crate::physical_expr::down_cast_any_ref;
use crate::PhysicalExpr;
-use arrow::array::{Array, ArrayRef};
-use arrow::compute::try_unary;
-use arrow::datatypes::{DataType, Date32Type, Date64Type, Schema};
+use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
-use datafusion_common::cast::*;
-use datafusion_common::scalar::*;
-use datafusion_common::Result;
-use datafusion_common::{DataFusionError, ScalarValue};
+use datafusion_common::{DataFusionError, Result};
use datafusion_expr::type_coercion::binary::coerce_types;
use datafusion_expr::{ColumnarValue, Operator};
use std::any::Any;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
-use super::binary::{
- interval_array_op, interval_scalar_interval_op, ts_array_op, ts_interval_array_op,
- ts_scalar_interval_op, ts_scalar_ts_op,
-};
+use super::binary::{resolve_temporal_op, resolve_temporal_op_scalar};
/// Perform DATE/TIME/TIMESTAMP +/ INTERVAL math
#[derive(Debug)]
@@ -151,13 +143,18 @@ impl PhysicalExpr for DateTimeIntervalExpr {
operand_lhs.sub(&operand_rhs)?
}))
}
- (ColumnarValue::Array(array_lhs), ColumnarValue::Scalar(operand_rhs)) => {
- evaluate_temporal_array(array_lhs, sign, &operand_rhs)
- }
-
- (ColumnarValue::Array(array_lhs), ColumnarValue::Array(array_rhs)) => {
- evaluate_temporal_arrays(&array_lhs, sign, &array_rhs)
+ // This function evaluates temporal array vs scalar operations, such as timestamp - timestamp,
+ // interval + interval, timestamp + interval, and interval + timestamp. It takes one array and one scalar as input
+ // and an integer sign representing the operation (+1 for addition and -1 for subtraction).
+ (ColumnarValue::Array(array_lhs), ColumnarValue::Scalar(array_rhs)) => {
+ resolve_temporal_op_scalar(&array_lhs, sign, &array_rhs)
}
+ // This function evaluates temporal array operations, such as timestamp - timestamp, interval + interval,
+ // timestamp + interval, and interval + timestamp. It takes two arrays as input and an integer sign representing
+ // the operation (+1 for addition and -1 for subtraction).
+ (ColumnarValue::Array(array_lhs), ColumnarValue::Array(array_rhs)) => Ok(
+ ColumnarValue::Array(resolve_temporal_op(&array_lhs, sign, &array_rhs)?),
+ ),
(_, _) => {
let msg = "If RHS of the operation is an array, then LHS also must be";
Err(DataFusionError::Internal(msg.to_string()))
@@ -223,82 +220,6 @@ impl PartialEq<dyn Any> for DateTimeIntervalExpr {
}
}
-pub fn evaluate_temporal_array(
- array: ArrayRef,
- sign: i32,
- scalar: &ScalarValue,
-) -> Result<ColumnarValue> {
- match (array.data_type(), scalar.get_datatype()) {
- // Date +- Interval
- (DataType::Date32, DataType::Interval(_)) => {
- let array = as_date32_array(&array)?;
- let ret = Arc::new(try_unary::<Date32Type, _, Date32Type>(array, |days| {
- Ok(date32_add(days, scalar, sign)?)
- })?) as ArrayRef;
- Ok(ColumnarValue::Array(ret))
- }
- (DataType::Date64, DataType::Interval(_)) => {
- let array = as_date64_array(&array)?;
- let ret = Arc::new(try_unary::<Date64Type, _, Date64Type>(array, |ms| {
- Ok(date64_add(ms, scalar, sign)?)
- })?) as ArrayRef;
- Ok(ColumnarValue::Array(ret))
- }
- // Timestamp - Timestamp
- (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) if sign == -1 => {
- ts_scalar_ts_op(array, scalar)
- }
- // Interval +- Interval
- (DataType::Interval(_), DataType::Interval(_)) => {
- interval_scalar_interval_op(array, sign, scalar)
- }
- // Timestamp +- Interval
- (DataType::Timestamp(_, _), DataType::Interval(_)) => {
- ts_scalar_interval_op(array, sign, scalar)
- }
- (_, _) => Err(DataFusionError::Execution(format!(
- "Invalid lhs type for DateIntervalExpr: {}",
- array.data_type()
- )))?,
- }
-}
-
-// This function evaluates temporal array operations, such as timestamp - timestamp, interval + interval,
-// timestamp + interval, and interval + timestamp. It takes two arrays as input and an integer sign representing
-// the operation (+1 for addition and -1 for subtraction). It returns a ColumnarValue as output, which can hold
-// either a scalar or an array.
-pub fn evaluate_temporal_arrays(
- array_lhs: &ArrayRef,
- sign: i32,
- array_rhs: &ArrayRef,
-) -> Result<ColumnarValue> {
- let ret = match (array_lhs.data_type(), array_rhs.data_type()) {
- // Timestamp - Timestamp operations, operands of only the same types are supported.
- (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) => {
- ts_array_op(array_lhs, array_rhs)?
- }
- // Interval (+ , -) Interval operations
- (DataType::Interval(_), DataType::Interval(_)) => {
- interval_array_op(array_lhs, array_rhs, sign)?
- }
- // Timestamp (+ , -) Interval and Interval + Timestamp operations
- // Interval - Timestamp operation is not rational hence not supported
- (DataType::Timestamp(_, _), DataType::Interval(_)) => {
- ts_interval_array_op(array_lhs, sign, array_rhs)?
- }
- (DataType::Interval(_), DataType::Timestamp(_, _)) if sign == 1 => {
- ts_interval_array_op(array_rhs, sign, array_lhs)?
- }
- (_, _) => Err(DataFusionError::Execution(format!(
- "Invalid array types for DateIntervalExpr: {} {} {}",
- array_lhs.data_type(),
- sign,
- array_rhs.data_type()
- )))?,
- };
- Ok(ColumnarValue::Array(ret))
-}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -309,7 +230,7 @@ mod tests {
use arrow_array::IntervalMonthDayNanoArray;
use chrono::{Duration, NaiveDate};
use datafusion_common::delta::shift_months;
- use datafusion_common::{Column, Result, ToDFSchema};
+ use datafusion_common::{Column, Result, ScalarValue, ToDFSchema};
use datafusion_expr::Expr;
use std::ops::Add;
@@ -703,7 +624,7 @@ mod tests {
}
// In this test, ArrayRef of one element arrays is evaluated with some ScalarValues,
- // aiming that evaluate_temporal_array function is working properly and shows the same
+ // aiming that resolve_temporal_op_scalar function is working properly and shows the same
// behavior with ScalarValue arithmetic.
fn experiment(
timestamp_scalar: ScalarValue,
@@ -714,7 +635,7 @@ mod tests {
// timestamp + interval
if let ColumnarValue::Array(res1) =
- evaluate_temporal_array(timestamp_array.clone(), 1, &interval_scalar)?
+ resolve_temporal_op_scalar(×tamp_array, 1, &interval_scalar)?
{
let res2 = timestamp_scalar.add(&interval_scalar)?.to_array();
assert_eq!(
@@ -725,7 +646,7 @@ mod tests {
// timestamp - interval
if let ColumnarValue::Array(res1) =
- evaluate_temporal_array(timestamp_array.clone(), -1, &interval_scalar)?
+ resolve_temporal_op_scalar(×tamp_array, -1, &interval_scalar)?
{
let res2 = timestamp_scalar.sub(&interval_scalar)?.to_array();
assert_eq!(
@@ -736,7 +657,7 @@ mod tests {
// timestamp - timestamp
if let ColumnarValue::Array(res1) =
- evaluate_temporal_array(timestamp_array.clone(), -1, ×tamp_scalar)?
+ resolve_temporal_op_scalar(×tamp_array, -1, ×tamp_scalar)?
{
let res2 = timestamp_scalar.sub(×tamp_scalar)?.to_array();
assert_eq!(
@@ -747,7 +668,7 @@ mod tests {
// interval - interval
if let ColumnarValue::Array(res1) =
- evaluate_temporal_array(interval_array.clone(), -1, &interval_scalar)?
+ resolve_temporal_op_scalar(&interval_array, -1, &interval_scalar)?
{
let res2 = interval_scalar.sub(&interval_scalar)?.to_array();
assert_eq!(
@@ -758,7 +679,7 @@ mod tests {
// interval + interval
if let ColumnarValue::Array(res1) =
- evaluate_temporal_array(interval_array, 1, &interval_scalar)?
+ resolve_temporal_op_scalar(&interval_array, 1, &interval_scalar)?
{
let res2 = interval_scalar.add(&interval_scalar)?.to_array();
assert_eq!(