You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/05/30 14:26:19 UTC
[arrow-datafusion] branch main updated: Refactor temporal arithmetic (#6433)
This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0abd4a272f Refactor temporal arithmetic (#6433)
0abd4a272f is described below
commit 0abd4a272f471c0d9cedf1aa65fad677f4a7717e
Author: Berkay Şahin <12...@users.noreply.github.com>
AuthorDate: Tue May 30 17:26:12 2023 +0300
Refactor temporal arithmetic (#6433)
* subtraction is ok, addition is on work
* Addition is also ok for scalar vs array temporals
* simplifications
* unit tests are extended
* Code style improvements
* Code simplification
* Macros written for duplications, unit tests added, operations moved to kernel_arrow
---------
Co-authored-by: Mustafa Akur <mu...@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
datafusion/common/src/delta.rs | 103 +-
datafusion/common/src/scalar.rs | 18 +-
datafusion/optimizer/src/analyzer/type_coercion.rs | 2 +-
datafusion/physical-expr/src/expressions/binary.rs | 1012 ++-----------
.../src/expressions/binary/kernels_arrow.rs | 1530 +++++++++++++++++++-
.../physical-expr/src/expressions/datetime.rs | 290 +++-
6 files changed, 1892 insertions(+), 1063 deletions(-)
diff --git a/datafusion/common/src/delta.rs b/datafusion/common/src/delta.rs
index cc723a9943..bb71e3eb93 100644
--- a/datafusion/common/src/delta.rs
+++ b/datafusion/common/src/delta.rs
@@ -49,7 +49,8 @@ fn normalise_day(year: i32, month: u32, day: u32) -> u32 {
/// Shift a date by the given number of months.
/// Ambiguous month-ends are shifted backwards as necessary.
-pub fn shift_months<D: Datelike>(date: D, months: i32) -> D {
+pub fn shift_months<D: Datelike>(date: D, months: i32, sign: i32) -> D {
+ let months = months * sign;
let mut year = date.year() + (date.month() as i32 + months) / 12;
let mut month = (date.month() as i32 + months) % 12;
let mut day = date.day();
@@ -108,117 +109,117 @@ mod tests {
let base = NaiveDate::from_ymd_opt(2020, 1, 31).unwrap();
assert_eq!(
- shift_months(base, 0),
+ shift_months(base, 0, 1),
NaiveDate::from_ymd_opt(2020, 1, 31).unwrap()
);
assert_eq!(
- shift_months(base, 1),
+ shift_months(base, 1, 1),
NaiveDate::from_ymd_opt(2020, 2, 29).unwrap()
);
assert_eq!(
- shift_months(base, 2),
+ shift_months(base, 2, 1),
NaiveDate::from_ymd_opt(2020, 3, 31).unwrap()
);
assert_eq!(
- shift_months(base, 3),
+ shift_months(base, 3, 1),
NaiveDate::from_ymd_opt(2020, 4, 30).unwrap()
);
assert_eq!(
- shift_months(base, 4),
+ shift_months(base, 4, 1),
NaiveDate::from_ymd_opt(2020, 5, 31).unwrap()
);
assert_eq!(
- shift_months(base, 5),
+ shift_months(base, 5, 1),
NaiveDate::from_ymd_opt(2020, 6, 30).unwrap()
);
assert_eq!(
- shift_months(base, 6),
+ shift_months(base, 6, 1),
NaiveDate::from_ymd_opt(2020, 7, 31).unwrap()
);
assert_eq!(
- shift_months(base, 7),
+ shift_months(base, 7, 1),
NaiveDate::from_ymd_opt(2020, 8, 31).unwrap()
);
assert_eq!(
- shift_months(base, 8),
+ shift_months(base, 8, 1),
NaiveDate::from_ymd_opt(2020, 9, 30).unwrap()
);
assert_eq!(
- shift_months(base, 9),
+ shift_months(base, 9, 1),
NaiveDate::from_ymd_opt(2020, 10, 31).unwrap()
);
assert_eq!(
- shift_months(base, 10),
+ shift_months(base, 10, 1),
NaiveDate::from_ymd_opt(2020, 11, 30).unwrap()
);
assert_eq!(
- shift_months(base, 11),
+ shift_months(base, 11, 1),
NaiveDate::from_ymd_opt(2020, 12, 31).unwrap()
);
assert_eq!(
- shift_months(base, 12),
+ shift_months(base, 12, 1),
NaiveDate::from_ymd_opt(2021, 1, 31).unwrap()
);
assert_eq!(
- shift_months(base, 13),
+ shift_months(base, 13, 1),
NaiveDate::from_ymd_opt(2021, 2, 28).unwrap()
);
assert_eq!(
- shift_months(base, -1),
+ shift_months(base, 1, -1),
NaiveDate::from_ymd_opt(2019, 12, 31).unwrap()
);
assert_eq!(
- shift_months(base, -2),
+ shift_months(base, 2, -1),
NaiveDate::from_ymd_opt(2019, 11, 30).unwrap()
);
assert_eq!(
- shift_months(base, -3),
+ shift_months(base, 3, -1),
NaiveDate::from_ymd_opt(2019, 10, 31).unwrap()
);
assert_eq!(
- shift_months(base, -4),
+ shift_months(base, 4, -1),
NaiveDate::from_ymd_opt(2019, 9, 30).unwrap()
);
assert_eq!(
- shift_months(base, -5),
+ shift_months(base, 5, -1),
NaiveDate::from_ymd_opt(2019, 8, 31).unwrap()
);
assert_eq!(
- shift_months(base, -6),
+ shift_months(base, 6, -1),
NaiveDate::from_ymd_opt(2019, 7, 31).unwrap()
);
assert_eq!(
- shift_months(base, -7),
+ shift_months(base, 7, -1),
NaiveDate::from_ymd_opt(2019, 6, 30).unwrap()
);
assert_eq!(
- shift_months(base, -8),
+ shift_months(base, 8, -1),
NaiveDate::from_ymd_opt(2019, 5, 31).unwrap()
);
assert_eq!(
- shift_months(base, -9),
+ shift_months(base, 9, -1),
NaiveDate::from_ymd_opt(2019, 4, 30).unwrap()
);
assert_eq!(
- shift_months(base, -10),
+ shift_months(base, 10, -1),
NaiveDate::from_ymd_opt(2019, 3, 31).unwrap()
);
assert_eq!(
- shift_months(base, -11),
+ shift_months(base, 11, -1),
NaiveDate::from_ymd_opt(2019, 2, 28).unwrap()
);
assert_eq!(
- shift_months(base, -12),
+ shift_months(base, 12, -1),
NaiveDate::from_ymd_opt(2019, 1, 31).unwrap()
);
assert_eq!(
- shift_months(base, -13),
+ shift_months(base, 13, -1),
NaiveDate::from_ymd_opt(2018, 12, 31).unwrap()
);
assert_eq!(
- shift_months(base, 1265),
+ shift_months(base, 1265, 1),
NaiveDate::from_ymd_opt(2125, 6, 30).unwrap()
);
}
@@ -227,42 +228,42 @@ mod tests {
fn test_shift_months_with_overflow() {
let base = NaiveDate::from_ymd_opt(2020, 12, 31).unwrap();
- assert_eq!(shift_months(base, 0), base);
+ assert_eq!(shift_months(base, 0, 1), base);
assert_eq!(
- shift_months(base, 1),
+ shift_months(base, 1, 1),
NaiveDate::from_ymd_opt(2021, 1, 31).unwrap()
);
assert_eq!(
- shift_months(base, 2),
+ shift_months(base, 2, 1),
NaiveDate::from_ymd_opt(2021, 2, 28).unwrap()
);
assert_eq!(
- shift_months(base, 12),
+ shift_months(base, 12, 1),
NaiveDate::from_ymd_opt(2021, 12, 31).unwrap()
);
assert_eq!(
- shift_months(base, 18),
+ shift_months(base, 18, 1),
NaiveDate::from_ymd_opt(2022, 6, 30).unwrap()
);
assert_eq!(
- shift_months(base, -1),
+ shift_months(base, 1, -1),
NaiveDate::from_ymd_opt(2020, 11, 30).unwrap()
);
assert_eq!(
- shift_months(base, -2),
+ shift_months(base, 2, -1),
NaiveDate::from_ymd_opt(2020, 10, 31).unwrap()
);
assert_eq!(
- shift_months(base, -10),
+ shift_months(base, 10, -1),
NaiveDate::from_ymd_opt(2020, 2, 29).unwrap()
);
assert_eq!(
- shift_months(base, -12),
+ shift_months(base, 12, -1),
NaiveDate::from_ymd_opt(2019, 12, 31).unwrap()
);
assert_eq!(
- shift_months(base, -18),
+ shift_months(base, 18, -1),
NaiveDate::from_ymd_opt(2019, 6, 30).unwrap()
);
}
@@ -275,61 +276,61 @@ mod tests {
let base = NaiveDateTime::new(date, o_clock);
assert_eq!(
- shift_months(base, 0).date(),
+ shift_months(base, 0, 1).date(),
NaiveDate::from_ymd_opt(2020, 1, 31).unwrap(),
);
assert_eq!(
- shift_months(base, 1).date(),
+ shift_months(base, 1, 1).date(),
NaiveDate::from_ymd_opt(2020, 2, 29).unwrap(),
);
assert_eq!(
- shift_months(base, 2).date(),
+ shift_months(base, 2, 1).date(),
NaiveDate::from_ymd_opt(2020, 3, 31).unwrap(),
);
- assert_eq!(shift_months(base, 0).time(), o_clock);
- assert_eq!(shift_months(base, 1).time(), o_clock);
- assert_eq!(shift_months(base, 2).time(), o_clock);
+ assert_eq!(shift_months(base, 0, 1).time(), o_clock);
+ assert_eq!(shift_months(base, 1, 1).time(), o_clock);
+ assert_eq!(shift_months(base, 2, 1).time(), o_clock);
}
#[test]
fn add_11_months() {
let prior = NaiveDate::from_ymd_opt(2000, 1, 1).unwrap();
- let actual = shift_months(prior, 11);
+ let actual = shift_months(prior, 11, 1);
assert_eq!(format!("{actual:?}").as_str(), "2000-12-01");
}
#[test]
fn add_12_months() {
let prior = NaiveDate::from_ymd_opt(2000, 1, 1).unwrap();
- let actual = shift_months(prior, 12);
+ let actual = shift_months(prior, 12, 1);
assert_eq!(format!("{actual:?}").as_str(), "2001-01-01");
}
#[test]
fn add_13_months() {
let prior = NaiveDate::from_ymd_opt(2000, 1, 1).unwrap();
- let actual = shift_months(prior, 13);
+ let actual = shift_months(prior, 13, 1);
assert_eq!(format!("{actual:?}").as_str(), "2001-02-01");
}
#[test]
fn sub_11_months() {
let prior = NaiveDate::from_ymd_opt(2000, 1, 1).unwrap();
- let actual = shift_months(prior, -11);
+ let actual = shift_months(prior, 11, -1);
assert_eq!(format!("{actual:?}").as_str(), "1999-02-01");
}
#[test]
fn sub_12_months() {
let prior = NaiveDate::from_ymd_opt(2000, 1, 1).unwrap();
- let actual = shift_months(prior, -12);
+ let actual = shift_months(prior, 12, -1);
assert_eq!(format!("{actual:?}").as_str(), "1999-01-01");
}
#[test]
fn sub_13_months() {
let prior = NaiveDate::from_ymd_opt(2000, 1, 1).unwrap();
- let actual = shift_months(prior, -13);
+ let actual = shift_months(prior, 13, -1);
assert_eq!(format!("{actual:?}").as_str(), "1998-12-01");
}
}
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 13f9d9f900..724b21787f 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -907,11 +907,11 @@ macro_rules! impl_op_arithmetic {
)))),
// Binary operations on arguments with different types:
(ScalarValue::Date32(Some(days)), _) => {
- let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;
+ let value = date32_op(*days, $RHS, get_sign!($OPERATION))?;
Ok(ScalarValue::Date32(Some(value)))
}
(ScalarValue::Date64(Some(ms)), _) => {
- let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?;
+ let value = date64_op(*ms, $RHS, get_sign!($OPERATION))?;
Ok(ScalarValue::Date64(Some(value)))
}
(ScalarValue::TimestampSecond(Some(ts_s), zone), _) => {
@@ -1247,14 +1247,14 @@ pub fn calculate_naives<const TIME_MODE: bool>(
}
#[inline]
-pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+pub fn date32_op(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let prior = epoch.add(Duration::days(days as i64));
do_date_math(prior, scalar, sign).map(|d| d.sub(epoch).num_days() as i32)
}
#[inline]
-pub fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+pub fn date64_op(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let prior = epoch.add(Duration::milliseconds(ms));
do_date_math(prior, scalar, sign).map(|d| d.sub(epoch).num_milliseconds())
@@ -1398,7 +1398,7 @@ where
{
Ok(match scalar {
ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
- ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign),
+ ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i, sign),
ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
other => Err(DataFusionError::Execution(format!(
"DateIntervalExpr does not support non-interval type {other:?}"
@@ -1415,7 +1415,7 @@ where
D: Datelike + Add<Duration, Output = D>,
{
Ok(match INTERVAL_MODE {
- YM_MODE => shift_months(prior, interval as i32 * sign),
+ YM_MODE => shift_months(prior, interval as i32, sign),
DT_MODE => add_day_time(prior, interval as i64, sign),
MDN_MODE => add_m_d_nano(prior, interval, sign),
_ => {
@@ -1427,7 +1427,7 @@ where
}
// Can remove once chrono:0.4.23 is released
-fn add_m_d_nano<D>(prior: D, interval: i128, sign: i32) -> D
+pub fn add_m_d_nano<D>(prior: D, interval: i128, sign: i32) -> D
where
D: Datelike + Add<Duration, Output = D>,
{
@@ -1435,13 +1435,13 @@ where
let months = months * sign;
let days = days * sign;
let nanos = nanos * sign as i64;
- let a = shift_months(prior, months);
+ let a = shift_months(prior, months, 1);
let b = a.add(Duration::days(days as i64));
b.add(Duration::nanoseconds(nanos))
}
// Can remove once chrono:0.4.23 is released
-fn add_day_time<D>(prior: D, interval: i64, sign: i32) -> D
+pub fn add_day_time<D>(prior: D, interval: i64, sign: i32) -> D
where
D: Datelike + Add<Duration, Output = D>,
{
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 1761d237c3..da033a54ae 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -986,7 +986,7 @@ mod test {
}
#[test]
- fn binary_op_date32_add_interval() -> Result<()> {
+ fn binary_op_date32_op_interval() -> Result<()> {
//CAST(Utf8("1998-03-18") AS Date32) + IntervalDayTime("386547056640")
let expr = cast(lit("1998-03-18"), DataType::Date32)
+ lit(ScalarValue::IntervalDayTime(Some(386547056640)));
diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index 295d5fe655..e5b66d4a39 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -21,8 +21,6 @@ mod kernels_arrow;
use std::{any::Any, sync::Arc};
-use chrono::NaiveDateTime;
-
use arrow::array::*;
use arrow::compute::kernels::arithmetic::{
add_dyn, add_scalar_dyn as add_dyn_scalar, divide_dyn_opt,
@@ -50,17 +48,12 @@ use arrow::compute::kernels::comparison::{
eq_dyn_utf8_scalar, gt_dyn_utf8_scalar, gt_eq_dyn_utf8_scalar, lt_dyn_utf8_scalar,
lt_eq_dyn_utf8_scalar, neq_dyn_utf8_scalar,
};
-use arrow::compute::{cast, try_unary, unary, CastOptions};
+use arrow::compute::{cast, CastOptions};
use arrow::datatypes::*;
use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
use arrow::compute::kernels::concat_elements::concat_elements_utf8;
-use datafusion_common::scalar::{
- calculate_naives, microseconds_add, microseconds_sub, milliseconds_add,
- milliseconds_sub, nanoseconds_add, nanoseconds_sub, op_dt, op_dt_mdn, op_mdn, op_ym,
- op_ym_dt, op_ym_mdn, parse_timezones, seconds_add, seconds_sub, MILLISECOND_MODE,
- NANOSECOND_MODE,
-};
+
use datafusion_expr::type_coercion::{is_decimal, is_timestamp, is_utf8_or_large_utf8};
use kernels::{
bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar,
@@ -68,21 +61,25 @@ use kernels::{
bitwise_shift_right_dyn_scalar, bitwise_xor_dyn, bitwise_xor_dyn_scalar,
};
use kernels_arrow::{
- add_decimal_dyn_scalar, add_dyn_decimal, add_dyn_temporal, add_dyn_temporal_scalar,
- divide_decimal_dyn_scalar, divide_dyn_opt_decimal, is_distinct_from,
- is_distinct_from_binary, is_distinct_from_bool, is_distinct_from_decimal,
- is_distinct_from_f32, is_distinct_from_f64, is_distinct_from_null,
- is_distinct_from_utf8, is_not_distinct_from, is_not_distinct_from_binary,
- is_not_distinct_from_bool, is_not_distinct_from_decimal, is_not_distinct_from_f32,
- is_not_distinct_from_f64, is_not_distinct_from_null, is_not_distinct_from_utf8,
- modulus_decimal_dyn_scalar, modulus_dyn_decimal, multiply_decimal_dyn_scalar,
- multiply_dyn_decimal, subtract_decimal_dyn_scalar, subtract_dyn_decimal,
- subtract_dyn_temporal, subtract_dyn_temporal_scalar,
+ add_decimal_dyn_scalar, add_dyn_decimal, add_dyn_temporal, divide_decimal_dyn_scalar,
+ divide_dyn_opt_decimal, is_distinct_from, is_distinct_from_binary,
+ is_distinct_from_bool, is_distinct_from_decimal, is_distinct_from_f32,
+ is_distinct_from_f64, is_distinct_from_null, is_distinct_from_utf8,
+ is_not_distinct_from, is_not_distinct_from_binary, is_not_distinct_from_bool,
+ is_not_distinct_from_decimal, is_not_distinct_from_f32, is_not_distinct_from_f64,
+ is_not_distinct_from_null, is_not_distinct_from_utf8, modulus_decimal_dyn_scalar,
+ modulus_dyn_decimal, multiply_decimal_dyn_scalar, multiply_dyn_decimal,
+ subtract_decimal_dyn_scalar, subtract_dyn_decimal, subtract_dyn_temporal,
};
use arrow::datatypes::{DataType, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;
+use self::kernels_arrow::{
+ add_dyn_temporal_left_scalar, add_dyn_temporal_right_scalar,
+ subtract_dyn_temporal_left_scalar, subtract_dyn_temporal_right_scalar,
+};
+
use super::column::Column;
use crate::expressions::cast_column;
use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
@@ -90,12 +87,7 @@ use crate::intervals::{apply_operator, Interval};
use crate::physical_expr::down_cast_any_ref;
use crate::{analysis_expect, AnalysisContext, ExprBoundaries, PhysicalExpr};
use datafusion_common::cast::as_boolean_array;
-use datafusion_common::cast::{
- as_interval_dt_array, as_interval_mdn_array, as_interval_ym_array,
- as_timestamp_microsecond_array, as_timestamp_millisecond_array,
- as_timestamp_nanosecond_array, as_timestamp_second_array,
-};
-use datafusion_common::scalar::*;
+
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::type_coercion::binary::{
@@ -1089,84 +1081,69 @@ impl BinaryExpr {
scalar: ScalarValue,
result_type: &DataType,
) -> Result<Option<Result<ArrayRef>>> {
+ use Operator::*;
let bool_type = &DataType::Boolean;
let scalar_result = match &self.op {
- Operator::Lt => {
- binary_array_op_dyn_scalar!(array, scalar, lt, bool_type)
- }
- Operator::LtEq => {
- binary_array_op_dyn_scalar!(array, scalar, lt_eq, bool_type)
- }
- Operator::Gt => {
- binary_array_op_dyn_scalar!(array, scalar, gt, bool_type)
- }
- Operator::GtEq => {
- binary_array_op_dyn_scalar!(array, scalar, gt_eq, bool_type)
- }
- Operator::Eq => {
- binary_array_op_dyn_scalar!(array, scalar, eq, bool_type)
- }
- Operator::NotEq => {
- binary_array_op_dyn_scalar!(array, scalar, neq, bool_type)
- }
- Operator::Plus => {
+ Lt => binary_array_op_dyn_scalar!(array, scalar, lt, bool_type),
+ LtEq => binary_array_op_dyn_scalar!(array, scalar, lt_eq, bool_type),
+ Gt => binary_array_op_dyn_scalar!(array, scalar, gt, bool_type),
+ GtEq => binary_array_op_dyn_scalar!(array, scalar, gt_eq, bool_type),
+ Eq => binary_array_op_dyn_scalar!(array, scalar, eq, bool_type),
+ NotEq => binary_array_op_dyn_scalar!(array, scalar, neq, bool_type),
+ Plus => {
binary_primitive_array_op_dyn_scalar!(array, scalar, add, result_type)
}
- Operator::Minus => {
- binary_primitive_array_op_dyn_scalar!(
- array,
- scalar,
- subtract,
- result_type
- )
- }
- Operator::Multiply => {
- binary_primitive_array_op_dyn_scalar!(
- array,
- scalar,
- multiply,
- result_type
- )
- }
- Operator::Divide => {
+ Minus => binary_primitive_array_op_dyn_scalar!(
+ array,
+ scalar,
+ subtract,
+ result_type
+ ),
+ Multiply => binary_primitive_array_op_dyn_scalar!(
+ array,
+ scalar,
+ multiply,
+ result_type
+ ),
+ Divide => {
binary_primitive_array_op_dyn_scalar!(array, scalar, divide, result_type)
}
- Operator::Modulo => {
+ Modulo => {
binary_primitive_array_op_dyn_scalar!(array, scalar, modulus, result_type)
}
- Operator::RegexMatch => binary_string_array_flag_op_scalar!(
+ RegexMatch => binary_string_array_flag_op_scalar!(
array,
scalar,
regexp_is_match,
false,
false
),
- Operator::RegexIMatch => binary_string_array_flag_op_scalar!(
+ RegexIMatch => binary_string_array_flag_op_scalar!(
array,
scalar,
regexp_is_match,
false,
true
),
- Operator::RegexNotMatch => binary_string_array_flag_op_scalar!(
+ RegexNotMatch => binary_string_array_flag_op_scalar!(
array,
scalar,
regexp_is_match,
true,
false
),
- Operator::RegexNotIMatch => binary_string_array_flag_op_scalar!(
+ RegexNotIMatch => binary_string_array_flag_op_scalar!(
array,
scalar,
regexp_is_match,
true,
true
),
- Operator::BitwiseAnd => bitwise_and_dyn_scalar(array, scalar),
- Operator::BitwiseOr => bitwise_or_dyn_scalar(array, scalar),
- Operator::BitwiseXor => bitwise_xor_dyn_scalar(array, scalar),
- Operator::BitwiseShiftRight => bitwise_shift_right_dyn_scalar(array, scalar),
- Operator::BitwiseShiftLeft => bitwise_shift_left_dyn_scalar(array, scalar),
+ BitwiseAnd => bitwise_and_dyn_scalar(array, scalar),
+ BitwiseOr => bitwise_or_dyn_scalar(array, scalar),
+ BitwiseXor => bitwise_xor_dyn_scalar(array, scalar),
+ BitwiseShiftRight => bitwise_shift_right_dyn_scalar(array, scalar),
+ BitwiseShiftLeft => bitwise_shift_left_dyn_scalar(array, scalar),
// if scalar operation is not supported - fallback to array implementation
_ => None,
};
@@ -1181,26 +1158,15 @@ impl BinaryExpr {
scalar: ScalarValue,
array: &ArrayRef,
) -> Result<Option<Result<ArrayRef>>> {
+ use Operator::*;
let bool_type = &DataType::Boolean;
let scalar_result = match &self.op {
- Operator::Lt => {
- binary_array_op_dyn_scalar!(array, scalar, gt, bool_type)
- }
- Operator::LtEq => {
- binary_array_op_dyn_scalar!(array, scalar, gt_eq, bool_type)
- }
- Operator::Gt => {
- binary_array_op_dyn_scalar!(array, scalar, lt, bool_type)
- }
- Operator::GtEq => {
- binary_array_op_dyn_scalar!(array, scalar, lt_eq, bool_type)
- }
- Operator::Eq => {
- binary_array_op_dyn_scalar!(array, scalar, eq, bool_type)
- }
- Operator::NotEq => {
- binary_array_op_dyn_scalar!(array, scalar, neq, bool_type)
- }
+ Lt => binary_array_op_dyn_scalar!(array, scalar, gt, bool_type),
+ LtEq => binary_array_op_dyn_scalar!(array, scalar, gt_eq, bool_type),
+ Gt => binary_array_op_dyn_scalar!(array, scalar, lt, bool_type),
+ GtEq => binary_array_op_dyn_scalar!(array, scalar, lt_eq, bool_type),
+ Eq => binary_array_op_dyn_scalar!(array, scalar, eq, bool_type),
+ NotEq => binary_array_op_dyn_scalar!(array, scalar, neq, bool_type),
// if scalar operation is not supported - fallback to array implementation
_ => None,
};
@@ -1215,14 +1181,15 @@ impl BinaryExpr {
right_data_type: &DataType,
result_type: &DataType,
) -> Result<ArrayRef> {
+ use Operator::*;
match &self.op {
- Operator::Lt => lt_dyn(&left, &right),
- Operator::LtEq => lt_eq_dyn(&left, &right),
- Operator::Gt => gt_dyn(&left, &right),
- Operator::GtEq => gt_eq_dyn(&left, &right),
- Operator::Eq => eq_dyn(&left, &right),
- Operator::NotEq => neq_dyn(&left, &right),
- Operator::IsDistinctFrom => {
+ Lt => lt_dyn(&left, &right),
+ LtEq => lt_eq_dyn(&left, &right),
+ Gt => gt_dyn(&left, &right),
+ GtEq => gt_eq_dyn(&left, &right),
+ Eq => eq_dyn(&left, &right),
+ NotEq => neq_dyn(&left, &right),
+ IsDistinctFrom => {
match (left_data_type, right_data_type) {
// exchange lhs and rhs when lhs is Null, since `binary_array_op` is
// always try to down cast array according to $LEFT expression.
@@ -1232,25 +1199,21 @@ impl BinaryExpr {
_ => binary_array_op!(left, right, is_distinct_from),
}
}
- Operator::IsNotDistinctFrom => {
- binary_array_op!(left, right, is_not_distinct_from)
- }
- Operator::Plus => {
- binary_primitive_array_op_dyn!(left, right, add_dyn, result_type)
- }
- Operator::Minus => {
+ IsNotDistinctFrom => binary_array_op!(left, right, is_not_distinct_from),
+ Plus => binary_primitive_array_op_dyn!(left, right, add_dyn, result_type),
+ Minus => {
binary_primitive_array_op_dyn!(left, right, subtract_dyn, result_type)
}
- Operator::Multiply => {
+ Multiply => {
binary_primitive_array_op_dyn!(left, right, multiply_dyn, result_type)
}
- Operator::Divide => {
+ Divide => {
binary_primitive_array_op_dyn!(left, right, divide_dyn_opt, result_type)
}
- Operator::Modulo => {
+ Modulo => {
binary_primitive_array_op_dyn!(left, right, modulus_dyn, result_type)
}
- Operator::And => {
+ And => {
if left_data_type == &DataType::Boolean {
boolean_op!(&left, &right, and_kleene)
} else {
@@ -1262,7 +1225,7 @@ impl BinaryExpr {
)))
}
}
- Operator::Or => {
+ Or => {
if left_data_type == &DataType::Boolean {
boolean_op!(&left, &right, or_kleene)
} else {
@@ -1272,24 +1235,24 @@ impl BinaryExpr {
)))
}
}
- Operator::RegexMatch => {
+ RegexMatch => {
binary_string_array_flag_op!(left, right, regexp_is_match, false, false)
}
- Operator::RegexIMatch => {
+ RegexIMatch => {
binary_string_array_flag_op!(left, right, regexp_is_match, false, true)
}
- Operator::RegexNotMatch => {
+ RegexNotMatch => {
binary_string_array_flag_op!(left, right, regexp_is_match, true, false)
}
- Operator::RegexNotIMatch => {
+ RegexNotIMatch => {
binary_string_array_flag_op!(left, right, regexp_is_match, true, true)
}
- Operator::BitwiseAnd => bitwise_and_dyn(left, right),
- Operator::BitwiseOr => bitwise_or_dyn(left, right),
- Operator::BitwiseXor => bitwise_xor_dyn(left, right),
- Operator::BitwiseShiftRight => bitwise_shift_right_dyn(left, right),
- Operator::BitwiseShiftLeft => bitwise_shift_left_dyn(left, right),
- Operator::StringConcat => {
+ BitwiseAnd => bitwise_and_dyn(left, right),
+ BitwiseOr => bitwise_or_dyn(left, right),
+ BitwiseXor => bitwise_xor_dyn(left, right),
+ BitwiseShiftRight => bitwise_shift_right_dyn(left, right),
+ BitwiseShiftLeft => bitwise_shift_left_dyn(left, right),
+ StringConcat => {
binary_string_array_op!(left, right, concat_elements)
}
}
@@ -1322,25 +1285,6 @@ pub fn binary(
Ok(Arc::new(BinaryExpr::new(lhs, op, rhs)))
}
-macro_rules! sub_timestamp_macro {
- ($array:expr, $rhs:expr, $caster:expr, $interval_type:ty, $opt_tz_lhs:expr, $multiplier:expr,
- $opt_tz_rhs:expr, $unit_sub:expr, $naive_sub_fn:expr, $counter:expr) => {{
- let prim_array = $caster(&$array)?;
- let ret: PrimitiveArray<$interval_type> = try_unary(prim_array, |lhs| {
- let (parsed_lhs_tz, parsed_rhs_tz) =
- (parse_timezones($opt_tz_lhs)?, parse_timezones($opt_tz_rhs)?);
- let (naive_lhs, naive_rhs) = calculate_naives::<$unit_sub>(
- lhs.mul_wrapping($multiplier),
- parsed_lhs_tz,
- $rhs.mul_wrapping($multiplier),
- parsed_rhs_tz,
- )?;
- Ok($naive_sub_fn($counter(&naive_lhs), $counter(&naive_rhs)))
- })?;
- Arc::new(ret) as ArrayRef
- }};
-}
-
pub fn resolve_temporal_op(
lhs: &ArrayRef,
sign: i32,
@@ -1356,795 +1300,19 @@ pub fn resolve_temporal_op(
}
pub fn resolve_temporal_op_scalar(
- lhs: &ArrayRef,
- sign: i32,
- rhs: &ScalarValue,
-) -> Result<ColumnarValue> {
- match sign {
- 1 => add_dyn_temporal_scalar(lhs, rhs),
- -1 => subtract_dyn_temporal_scalar(lhs, rhs),
- other => Err(DataFusionError::Internal(format!(
- "Undefined operation for temporal types {other}"
- ))),
- }
-}
-
-/// This function handles the Timestamp - Timestamp operations,
-/// where the first one is an array, and the second one is a scalar,
-/// hence the result is also an array.
-pub fn ts_scalar_ts_op(array: &ArrayRef, scalar: &ScalarValue) -> Result<ColumnarValue> {
- let ret = match (array.data_type(), scalar) {
- (
- DataType::Timestamp(TimeUnit::Second, opt_tz_lhs),
- ScalarValue::TimestampSecond(Some(rhs), opt_tz_rhs),
- ) => {
- sub_timestamp_macro!(
- array,
- rhs,
- as_timestamp_second_array,
- IntervalDayTimeType,
- opt_tz_lhs.as_deref(),
- 1000,
- opt_tz_rhs.as_deref(),
- MILLISECOND_MODE,
- seconds_sub,
- NaiveDateTime::timestamp
- )
- }
- (
- DataType::Timestamp(TimeUnit::Millisecond, opt_tz_lhs),
- ScalarValue::TimestampMillisecond(Some(rhs), opt_tz_rhs),
- ) => {
- sub_timestamp_macro!(
- array,
- rhs,
- as_timestamp_millisecond_array,
- IntervalDayTimeType,
- opt_tz_lhs.as_deref(),
- 1,
- opt_tz_rhs.as_deref(),
- MILLISECOND_MODE,
- milliseconds_sub,
- NaiveDateTime::timestamp_millis
- )
- }
- (
- DataType::Timestamp(TimeUnit::Microsecond, opt_tz_lhs),
- ScalarValue::TimestampMicrosecond(Some(rhs), opt_tz_rhs),
- ) => {
- sub_timestamp_macro!(
- array,
- rhs,
- as_timestamp_microsecond_array,
- IntervalMonthDayNanoType,
- opt_tz_lhs.as_deref(),
- 1000,
- opt_tz_rhs.as_deref(),
- NANOSECOND_MODE,
- microseconds_sub,
- NaiveDateTime::timestamp_micros
- )
- }
- (
- DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_lhs),
- ScalarValue::TimestampNanosecond(Some(rhs), opt_tz_rhs),
- ) => {
- sub_timestamp_macro!(
- array,
- rhs,
- as_timestamp_nanosecond_array,
- IntervalMonthDayNanoType,
- opt_tz_lhs.as_deref(),
- 1,
- opt_tz_rhs.as_deref(),
- NANOSECOND_MODE,
- nanoseconds_sub,
- NaiveDateTime::timestamp_nanos
- )
- }
- (_, _) => {
- return Err(DataFusionError::Internal(format!(
- "Invalid array - scalar types for Timestamp subtraction: {:?} - {:?}",
- array.data_type(),
- scalar.get_datatype()
- )));
- }
- };
- Ok(ColumnarValue::Array(ret))
-}
-
-macro_rules! sub_timestamp_interval_macro {
- ($array:expr, $as_timestamp:expr, $ts_type:ty, $fn_op:expr, $scalar:expr, $sign:expr, $tz:expr) => {{
- let array = $as_timestamp(&$array)?;
- let ret: PrimitiveArray<$ts_type> =
- try_unary::<$ts_type, _, $ts_type>(array, |ts_s| {
- Ok($fn_op(ts_s, $scalar, $sign)?)
- })?;
- Arc::new(ret.with_timezone_opt($tz.clone())) as ArrayRef
- }};
-}
-/// This function handles the Timestamp - Interval operations,
-/// where the first one is an array, and the second one is a scalar,
-/// hence the result is also an array.
-pub fn ts_scalar_interval_op(
- array: &ArrayRef,
- sign: i32,
- scalar: &ScalarValue,
-) -> Result<ColumnarValue> {
- let ret = match array.data_type() {
- DataType::Timestamp(TimeUnit::Second, tz) => {
- sub_timestamp_interval_macro!(
- array,
- as_timestamp_second_array,
- TimestampSecondType,
- seconds_add,
- scalar,
- sign,
- tz
- )
- }
- DataType::Timestamp(TimeUnit::Millisecond, tz) => {
- sub_timestamp_interval_macro!(
- array,
- as_timestamp_millisecond_array,
- TimestampMillisecondType,
- milliseconds_add,
- scalar,
- sign,
- tz
- )
- }
- DataType::Timestamp(TimeUnit::Microsecond, tz) => {
- sub_timestamp_interval_macro!(
- array,
- as_timestamp_microsecond_array,
- TimestampMicrosecondType,
- microseconds_add,
- scalar,
- sign,
- tz
- )
- }
- DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
- sub_timestamp_interval_macro!(
- array,
- as_timestamp_nanosecond_array,
- TimestampNanosecondType,
- nanoseconds_add,
- scalar,
- sign,
- tz
- )
- }
- _ => Err(DataFusionError::Internal(format!(
- "Invalid lhs type for Timestamp vs Interval operations: {}",
- array.data_type()
- )))?,
- };
- Ok(ColumnarValue::Array(ret))
-}
-
-macro_rules! sub_interval_macro {
- ($array:expr, $as_interval:expr, $interval_type:ty, $fn_op:expr, $scalar:expr, $sign:expr) => {{
- let array = $as_interval(&$array)?;
- let ret: PrimitiveArray<$interval_type> =
- unary(array, |lhs| $fn_op(lhs, *$scalar, $sign));
- Arc::new(ret) as ArrayRef
- }};
-}
-macro_rules! sub_interval_cross_macro {
- ($array:expr, $as_interval:expr, $commute:expr, $fn_op:expr, $scalar:expr, $sign:expr, $t1:ty, $t2:ty) => {{
- let array = $as_interval(&$array)?;
- let ret: PrimitiveArray<IntervalMonthDayNanoType> = if $commute {
- unary(array, |lhs| {
- $fn_op(*$scalar as $t1, lhs as $t2, $sign, $commute)
- })
- } else {
- unary(array, |lhs| {
- $fn_op(lhs as $t1, *$scalar as $t2, $sign, $commute)
- })
- };
- Arc::new(ret) as ArrayRef
- }};
-}
-/// This function handles the Interval - Interval operations,
-/// where the first one is an array, and the second one is a scalar,
-/// hence the result is also an interval array.
-pub fn interval_scalar_interval_op(
- array: &ArrayRef,
+ arr: &ArrayRef,
sign: i32,
scalar: &ScalarValue,
-) -> Result<ColumnarValue> {
- let ret = match (array.data_type(), scalar) {
- (
- DataType::Interval(IntervalUnit::YearMonth),
- ScalarValue::IntervalYearMonth(Some(rhs)),
- ) => {
- sub_interval_macro!(
- array,
- as_interval_ym_array,
- IntervalYearMonthType,
- op_ym,
- rhs,
- sign
- )
- }
- (
- DataType::Interval(IntervalUnit::YearMonth),
- ScalarValue::IntervalDayTime(Some(rhs)),
- ) => {
- sub_interval_cross_macro!(
- array,
- as_interval_ym_array,
- false,
- op_ym_dt,
- rhs,
- sign,
- i32,
- i64
- )
- }
- (
- DataType::Interval(IntervalUnit::YearMonth),
- ScalarValue::IntervalMonthDayNano(Some(rhs)),
- ) => {
- sub_interval_cross_macro!(
- array,
- as_interval_ym_array,
- false,
- op_ym_mdn,
- rhs,
- sign,
- i32,
- i128
- )
- }
- (
- DataType::Interval(IntervalUnit::DayTime),
- ScalarValue::IntervalYearMonth(Some(rhs)),
- ) => {
- sub_interval_cross_macro!(
- array,
- as_interval_dt_array,
- true,
- op_ym_dt,
- rhs,
- sign,
- i32,
- i64
- )
- }
- (
- DataType::Interval(IntervalUnit::DayTime),
- ScalarValue::IntervalDayTime(Some(rhs)),
- ) => {
- sub_interval_macro!(
- array,
- as_interval_dt_array,
- IntervalDayTimeType,
- op_dt,
- rhs,
- sign
- )
- }
- (
- DataType::Interval(IntervalUnit::DayTime),
- ScalarValue::IntervalMonthDayNano(Some(rhs)),
- ) => {
- sub_interval_cross_macro!(
- array,
- as_interval_dt_array,
- false,
- op_dt_mdn,
- rhs,
- sign,
- i64,
- i128
- )
- }
- (
- DataType::Interval(IntervalUnit::MonthDayNano),
- ScalarValue::IntervalYearMonth(Some(rhs)),
- ) => {
- sub_interval_cross_macro!(
- array,
- as_interval_mdn_array,
- true,
- op_ym_mdn,
- rhs,
- sign,
- i32,
- i128
- )
- }
- (
- DataType::Interval(IntervalUnit::MonthDayNano),
- ScalarValue::IntervalDayTime(Some(rhs)),
- ) => {
- sub_interval_cross_macro!(
- array,
- as_interval_mdn_array,
- true,
- op_dt_mdn,
- rhs,
- sign,
- i64,
- i128
- )
- }
- (
- DataType::Interval(IntervalUnit::MonthDayNano),
- ScalarValue::IntervalMonthDayNano(Some(rhs)),
- ) => {
- sub_interval_macro!(
- array,
- as_interval_mdn_array,
- IntervalMonthDayNanoType,
- op_mdn,
- rhs,
- sign
- )
- }
- _ => Err(DataFusionError::Internal(format!(
- "Invalid operands for Interval vs Interval operations: {} - {}",
- array.data_type(),
- scalar.get_datatype(),
- )))?,
- };
- Ok(ColumnarValue::Array(ret))
-}
-
-// Macros related with timestamp & interval operations
-macro_rules! ts_sub_op {
- ($lhs:ident, $rhs:ident, $lhs_tz:ident, $rhs_tz:ident, $coef:expr, $caster:expr, $op:expr, $ts_unit:expr, $mode:expr, $type_out:ty) => {{
- let prim_array_lhs = $caster(&$lhs)?;
- let prim_array_rhs = $caster(&$rhs)?;
- let ret: PrimitiveArray<$type_out> =
- arrow::compute::try_binary(prim_array_lhs, prim_array_rhs, |ts1, ts2| {
- let (parsed_lhs_tz, parsed_rhs_tz) = (
- parse_timezones($lhs_tz.as_deref())?,
- parse_timezones($rhs_tz.as_deref())?,
- );
- let (naive_lhs, naive_rhs) = calculate_naives::<$mode>(
- ts1.mul_wrapping($coef),
- parsed_lhs_tz,
- ts2.mul_wrapping($coef),
- parsed_rhs_tz,
- )?;
- Ok($op($ts_unit(&naive_lhs), $ts_unit(&naive_rhs)))
- })?;
- Arc::new(ret) as ArrayRef
- }};
-}
-macro_rules! interval_op {
- ($lhs:ident, $rhs:ident, $caster:expr, $op:expr, $sign:ident, $type_in:ty) => {{
- let prim_array_lhs = $caster(&$lhs)?;
- let prim_array_rhs = $caster(&$rhs)?;
- let ret = Arc::new(arrow::compute::binary::<$type_in, $type_in, _, $type_in>(
- prim_array_lhs,
- prim_array_rhs,
- |interval1, interval2| $op(interval1, interval2, $sign),
- )?) as ArrayRef;
- ret
- }};
-}
-macro_rules! interval_cross_op {
- ($lhs:ident, $rhs:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $commute:ident, $type_in1:ty, $type_in2:ty) => {{
- let prim_array_lhs = $caster1(&$lhs)?;
- let prim_array_rhs = $caster2(&$rhs)?;
- let ret = Arc::new(arrow::compute::binary::<
- $type_in1,
- $type_in2,
- _,
- IntervalMonthDayNanoType,
- >(
- prim_array_lhs,
- prim_array_rhs,
- |interval1, interval2| $op(interval1, interval2, $sign, $commute),
- )?) as ArrayRef;
- ret
- }};
-}
-macro_rules! ts_interval_op {
- ($lhs:ident, $rhs:ident, $tz:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $type_in1:ty, $type_in2:ty) => {{
- let prim_array_lhs = $caster1(&$lhs)?;
- let prim_array_rhs = $caster2(&$rhs)?;
- let ret: PrimitiveArray<$type_in1> = arrow::compute::try_binary(
- prim_array_lhs,
- prim_array_rhs,
- |ts, interval| Ok($op(ts, interval as i128, $sign)?),
- )?;
- Arc::new(ret.with_timezone_opt($tz.clone())) as ArrayRef
- }};
-}
-
-/// Performs a timestamp subtraction operation on two arrays and returns the resulting array.
-pub fn ts_array_op(array_lhs: &ArrayRef, array_rhs: &ArrayRef) -> Result<ArrayRef> {
- match (array_lhs.data_type(), array_rhs.data_type()) {
- (
- DataType::Timestamp(TimeUnit::Second, opt_tz_lhs),
- DataType::Timestamp(TimeUnit::Second, opt_tz_rhs),
- ) => Ok(ts_sub_op!(
- array_lhs,
- array_rhs,
- opt_tz_lhs,
- opt_tz_rhs,
- 1000i64,
- as_timestamp_second_array,
- seconds_sub,
- NaiveDateTime::timestamp,
- MILLISECOND_MODE,
- IntervalDayTimeType
- )),
- (
- DataType::Timestamp(TimeUnit::Millisecond, opt_tz_lhs),
- DataType::Timestamp(TimeUnit::Millisecond, opt_tz_rhs),
- ) => Ok(ts_sub_op!(
- array_lhs,
- array_rhs,
- opt_tz_lhs,
- opt_tz_rhs,
- 1i64,
- as_timestamp_millisecond_array,
- milliseconds_sub,
- NaiveDateTime::timestamp_millis,
- MILLISECOND_MODE,
- IntervalDayTimeType
- )),
- (
- DataType::Timestamp(TimeUnit::Microsecond, opt_tz_lhs),
- DataType::Timestamp(TimeUnit::Microsecond, opt_tz_rhs),
- ) => Ok(ts_sub_op!(
- array_lhs,
- array_rhs,
- opt_tz_lhs,
- opt_tz_rhs,
- 1000i64,
- as_timestamp_microsecond_array,
- microseconds_sub,
- NaiveDateTime::timestamp_micros,
- NANOSECOND_MODE,
- IntervalMonthDayNanoType
- )),
- (
- DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_lhs),
- DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_rhs),
- ) => Ok(ts_sub_op!(
- array_lhs,
- array_rhs,
- opt_tz_lhs,
- opt_tz_rhs,
- 1i64,
- as_timestamp_nanosecond_array,
- nanoseconds_sub,
- NaiveDateTime::timestamp_nanos,
- NANOSECOND_MODE,
- IntervalMonthDayNanoType
- )),
- (_, _) => Err(DataFusionError::Execution(format!(
- "Invalid array types for Timestamp subtraction: {} - {}",
- array_lhs.data_type(),
- array_rhs.data_type()
- ))),
- }
-}
-/// Performs an interval operation on two arrays and returns the resulting array.
-/// The operation sign determines whether to perform addition or subtraction.
-/// The data type and unit of the two input arrays must match the supported combinations.
-pub fn interval_array_op(
- array_lhs: &ArrayRef,
- array_rhs: &ArrayRef,
- sign: i32,
+ swap: bool,
) -> Result<ArrayRef> {
- match (array_lhs.data_type(), array_rhs.data_type()) {
- (
- DataType::Interval(IntervalUnit::YearMonth),
- DataType::Interval(IntervalUnit::YearMonth),
- ) => Ok(interval_op!(
- array_lhs,
- array_rhs,
- as_interval_ym_array,
- op_ym,
- sign,
- IntervalYearMonthType
- )),
- (
- DataType::Interval(IntervalUnit::YearMonth),
- DataType::Interval(IntervalUnit::DayTime),
- ) => Ok(interval_cross_op!(
- array_lhs,
- array_rhs,
- as_interval_ym_array,
- as_interval_dt_array,
- op_ym_dt,
- sign,
- false,
- IntervalYearMonthType,
- IntervalDayTimeType
- )),
- (
- DataType::Interval(IntervalUnit::YearMonth),
- DataType::Interval(IntervalUnit::MonthDayNano),
- ) => Ok(interval_cross_op!(
- array_lhs,
- array_rhs,
- as_interval_ym_array,
- as_interval_mdn_array,
- op_ym_mdn,
- sign,
- false,
- IntervalYearMonthType,
- IntervalMonthDayNanoType
- )),
- (
- DataType::Interval(IntervalUnit::DayTime),
- DataType::Interval(IntervalUnit::YearMonth),
- ) => Ok(interval_cross_op!(
- array_rhs,
- array_lhs,
- as_interval_ym_array,
- as_interval_dt_array,
- op_ym_dt,
- sign,
- true,
- IntervalYearMonthType,
- IntervalDayTimeType
- )),
- (
- DataType::Interval(IntervalUnit::DayTime),
- DataType::Interval(IntervalUnit::DayTime),
- ) => Ok(interval_op!(
- array_lhs,
- array_rhs,
- as_interval_dt_array,
- op_dt,
- sign,
- IntervalDayTimeType
- )),
- (
- DataType::Interval(IntervalUnit::DayTime),
- DataType::Interval(IntervalUnit::MonthDayNano),
- ) => Ok(interval_cross_op!(
- array_lhs,
- array_rhs,
- as_interval_dt_array,
- as_interval_mdn_array,
- op_dt_mdn,
- sign,
- false,
- IntervalDayTimeType,
- IntervalMonthDayNanoType
- )),
- (
- DataType::Interval(IntervalUnit::MonthDayNano),
- DataType::Interval(IntervalUnit::YearMonth),
- ) => Ok(interval_cross_op!(
- array_rhs,
- array_lhs,
- as_interval_ym_array,
- as_interval_mdn_array,
- op_ym_mdn,
- sign,
- true,
- IntervalYearMonthType,
- IntervalMonthDayNanoType
+ match (sign, swap) {
+ (1, false) => add_dyn_temporal_right_scalar(arr, scalar),
+ (1, true) => add_dyn_temporal_left_scalar(scalar, arr),
+ (-1, false) => subtract_dyn_temporal_right_scalar(arr, scalar),
+ (-1, true) => subtract_dyn_temporal_left_scalar(scalar, arr),
+ _ => Err(DataFusionError::Internal(
+ "Undefined operation for temporal types".to_string(),
)),
- (
- DataType::Interval(IntervalUnit::MonthDayNano),
- DataType::Interval(IntervalUnit::DayTime),
- ) => Ok(interval_cross_op!(
- array_rhs,
- array_lhs,
- as_interval_dt_array,
- as_interval_mdn_array,
- op_dt_mdn,
- sign,
- true,
- IntervalDayTimeType,
- IntervalMonthDayNanoType
- )),
- (
- DataType::Interval(IntervalUnit::MonthDayNano),
- DataType::Interval(IntervalUnit::MonthDayNano),
- ) => Ok(interval_op!(
- array_lhs,
- array_rhs,
- as_interval_mdn_array,
- op_mdn,
- sign,
- IntervalMonthDayNanoType
- )),
- (_, _) => Err(DataFusionError::Execution(format!(
- "Invalid array types for Interval operation: {} {} {}",
- array_lhs.data_type(),
- sign,
- array_rhs.data_type()
- ))),
- }
-}
-/// Performs a timestamp/interval operation on two arrays and returns the resulting array.
-/// The operation sign determines whether to perform addition or subtraction.
-/// The data type and unit of the two input arrays must match the supported combinations.
-pub fn ts_interval_array_op(
- array_lhs: &ArrayRef,
- sign: i32,
- array_rhs: &ArrayRef,
-) -> Result<ArrayRef> {
- match (array_lhs.data_type(), array_rhs.data_type()) {
- (
- DataType::Timestamp(TimeUnit::Second, tz),
- DataType::Interval(IntervalUnit::YearMonth),
- ) => Ok(ts_interval_op!(
- array_lhs,
- array_rhs,
- tz,
- as_timestamp_second_array,
- as_interval_ym_array,
- seconds_add_array::<YM_MODE>,
- sign,
- TimestampSecondType,
- IntervalYearMonthType
- )),
- (
- DataType::Timestamp(TimeUnit::Second, tz),
- DataType::Interval(IntervalUnit::DayTime),
- ) => Ok(ts_interval_op!(
- array_lhs,
- array_rhs,
- tz,
- as_timestamp_second_array,
- as_interval_dt_array,
- seconds_add_array::<DT_MODE>,
- sign,
- TimestampSecondType,
- IntervalDayTimeType
- )),
- (
- DataType::Timestamp(TimeUnit::Second, tz),
- DataType::Interval(IntervalUnit::MonthDayNano),
- ) => Ok(ts_interval_op!(
- array_lhs,
- array_rhs,
- tz,
- as_timestamp_second_array,
- as_interval_mdn_array,
- seconds_add_array::<MDN_MODE>,
- sign,
- TimestampSecondType,
- IntervalMonthDayNanoType
- )),
- (
- DataType::Timestamp(TimeUnit::Millisecond, tz),
- DataType::Interval(IntervalUnit::YearMonth),
- ) => Ok(ts_interval_op!(
- array_lhs,
- array_rhs,
- tz,
- as_timestamp_millisecond_array,
- as_interval_ym_array,
- milliseconds_add_array::<YM_MODE>,
- sign,
- TimestampMillisecondType,
- IntervalYearMonthType
- )),
- (
- DataType::Timestamp(TimeUnit::Millisecond, tz),
- DataType::Interval(IntervalUnit::DayTime),
- ) => Ok(ts_interval_op!(
- array_lhs,
- array_rhs,
- tz,
- as_timestamp_millisecond_array,
- as_interval_dt_array,
- milliseconds_add_array::<DT_MODE>,
- sign,
- TimestampMillisecondType,
- IntervalDayTimeType
- )),
- (
- DataType::Timestamp(TimeUnit::Millisecond, tz),
- DataType::Interval(IntervalUnit::MonthDayNano),
- ) => Ok(ts_interval_op!(
- array_lhs,
- array_rhs,
- tz,
- as_timestamp_millisecond_array,
- as_interval_mdn_array,
- milliseconds_add_array::<MDN_MODE>,
- sign,
- TimestampMillisecondType,
- IntervalMonthDayNanoType
- )),
- (
- DataType::Timestamp(TimeUnit::Microsecond, tz),
- DataType::Interval(IntervalUnit::YearMonth),
- ) => Ok(ts_interval_op!(
- array_lhs,
- array_rhs,
- tz,
- as_timestamp_microsecond_array,
- as_interval_ym_array,
- microseconds_add_array::<YM_MODE>,
- sign,
- TimestampMicrosecondType,
- IntervalYearMonthType
- )),
- (
- DataType::Timestamp(TimeUnit::Microsecond, tz),
- DataType::Interval(IntervalUnit::DayTime),
- ) => Ok(ts_interval_op!(
- array_lhs,
- array_rhs,
- tz,
- as_timestamp_microsecond_array,
- as_interval_dt_array,
- microseconds_add_array::<DT_MODE>,
- sign,
- TimestampMicrosecondType,
- IntervalDayTimeType
- )),
- (
- DataType::Timestamp(TimeUnit::Microsecond, tz),
- DataType::Interval(IntervalUnit::MonthDayNano),
- ) => Ok(ts_interval_op!(
- array_lhs,
- array_rhs,
- tz,
- as_timestamp_microsecond_array,
- as_interval_mdn_array,
- microseconds_add_array::<MDN_MODE>,
- sign,
- TimestampMicrosecondType,
- IntervalMonthDayNanoType
- )),
- (
- DataType::Timestamp(TimeUnit::Nanosecond, tz),
- DataType::Interval(IntervalUnit::YearMonth),
- ) => Ok(ts_interval_op!(
- array_lhs,
- array_rhs,
- tz,
- as_timestamp_nanosecond_array,
- as_interval_ym_array,
- nanoseconds_add_array::<YM_MODE>,
- sign,
- TimestampNanosecondType,
- IntervalYearMonthType
- )),
- (
- DataType::Timestamp(TimeUnit::Nanosecond, tz),
- DataType::Interval(IntervalUnit::DayTime),
- ) => Ok(ts_interval_op!(
- array_lhs,
- array_rhs,
- tz,
- as_timestamp_nanosecond_array,
- as_interval_dt_array,
- nanoseconds_add_array::<DT_MODE>,
- sign,
- TimestampNanosecondType,
- IntervalDayTimeType
- )),
- (
- DataType::Timestamp(TimeUnit::Nanosecond, tz),
- DataType::Interval(IntervalUnit::MonthDayNano),
- ) => Ok(ts_interval_op!(
- array_lhs,
- array_rhs,
- tz,
- as_timestamp_nanosecond_array,
- as_interval_mdn_array,
- nanoseconds_add_array::<MDN_MODE>,
- sign,
- TimestampNanosecondType,
- IntervalMonthDayNanoType
- )),
- (_, _) => Err(DataFusionError::Execution(format!(
- "Invalid array types for Timestamp Interval operation: {} {} {}",
- array_lhs.data_type(),
- sign,
- array_rhs.data_type()
- ))),
}
}
diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
index 50a9f86c06..4d984ac8e8 100644
--- a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
+++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
@@ -31,19 +31,35 @@ use arrow::{array::*, datatypes::ArrowNumericType, downcast_dictionary_array};
use arrow_array::types::{ArrowDictionaryKeyType, DecimalType};
use arrow_array::ArrowNativeTypeOp;
use arrow_buffer::ArrowNativeType;
-use arrow_schema::DataType;
+use arrow_schema::{DataType, IntervalUnit};
+use chrono::{Days, Duration, Months, NaiveDate, NaiveDateTime};
use datafusion_common::cast::{as_date32_array, as_date64_array, as_decimal128_array};
-use datafusion_common::scalar::{date32_add, date64_add};
+use datafusion_common::scalar::{date32_op, date64_op};
use datafusion_common::{DataFusionError, Result, ScalarValue};
-use datafusion_expr::ColumnarValue;
use std::cmp::min;
+use std::ops::Add;
use std::sync::Arc;
-use super::{
- interval_array_op, interval_scalar_interval_op, ts_array_op, ts_interval_array_op,
- ts_scalar_interval_op, ts_scalar_ts_op,
+use arrow::compute::unary;
+use arrow::datatypes::*;
+
+use arrow_array::temporal_conversions::{MILLISECONDS_IN_DAY, NANOSECONDS_IN_DAY};
+use datafusion_common::delta::shift_months;
+use datafusion_common::scalar::{
+ calculate_naives, microseconds_add, microseconds_sub, milliseconds_add,
+ milliseconds_sub, nanoseconds_add, nanoseconds_sub, op_dt, op_dt_mdn, op_mdn, op_ym,
+ op_ym_dt, op_ym_mdn, parse_timezones, seconds_add, MILLISECOND_MODE, NANOSECOND_MODE,
};
+use arrow::datatypes::TimeUnit;
+
+use datafusion_common::cast::{
+ as_interval_dt_array, as_interval_mdn_array, as_interval_ym_array,
+ as_timestamp_microsecond_array, as_timestamp_millisecond_array,
+ as_timestamp_nanosecond_array, as_timestamp_second_array,
+};
+use datafusion_common::scalar::*;
+
// Simple (low performance) kernels until optimized kernels are added to arrow
// See https://github.com/apache/arrow-rs/issues/960
@@ -333,34 +349,88 @@ pub(crate) fn add_dyn_temporal(left: &ArrayRef, right: &ArrayRef) -> Result<Arra
}
}
-pub(crate) fn add_dyn_temporal_scalar(
+pub(crate) fn add_dyn_temporal_right_scalar(
left: &ArrayRef,
right: &ScalarValue,
-) -> Result<ColumnarValue> {
+) -> Result<ArrayRef> {
match (left.data_type(), right.get_datatype()) {
+ // Date32 + Interval
(DataType::Date32, DataType::Interval(..)) => {
let left = as_date32_array(&left)?;
let ret = Arc::new(try_unary::<Date32Type, _, Date32Type>(left, |days| {
- Ok(date32_add(days, right, 1)?)
- })?) as ArrayRef;
- Ok(ColumnarValue::Array(ret))
+ Ok(date32_op(days, right, 1)?)
+ })?) as _;
+ Ok(ret)
}
+ // Date64 + Interval
(DataType::Date64, DataType::Interval(..)) => {
let left = as_date64_array(&left)?;
let ret = Arc::new(try_unary::<Date64Type, _, Date64Type>(left, |ms| {
- Ok(date64_add(ms, right, 1)?)
- })?) as ArrayRef;
- Ok(ColumnarValue::Array(ret))
+ Ok(date64_op(ms, right, 1)?)
+ })?) as _;
+ Ok(ret)
+ }
+ // Interval + Interval
+ (DataType::Interval(..), DataType::Interval(..)) => {
+ interval_op_scalar_interval(left, 1, right)
+ }
+ // Timestamp + Interval
+ (DataType::Timestamp(..), DataType::Interval(..)) => {
+ ts_op_scalar_interval(left, 1, right)
+ }
+ _ => {
+ // fall back to kernels in arrow-rs
+ Ok(add_dyn(left, &right.to_array())?)
+ }
+ }
+}
+
+pub(crate) fn add_dyn_temporal_left_scalar(
+ left: &ScalarValue,
+ right: &ArrayRef,
+) -> Result<ArrayRef> {
+ match (left.get_datatype(), right.data_type()) {
+ // Date32 + Interval
+ (DataType::Date32, DataType::Interval(..)) => {
+ if let ScalarValue::Date32(Some(left)) = left {
+ scalar_date32_array_interval_op(
+ *left,
+ right,
+ NaiveDate::checked_add_days,
+ NaiveDate::checked_add_months,
+ )
+ } else {
+ Err(DataFusionError::Internal(
+ "Date32 value is None".to_string(),
+ ))
+ }
}
+ // Date64 + Interval
+ (DataType::Date64, DataType::Interval(..)) => {
+ if let ScalarValue::Date64(Some(left)) = left {
+ scalar_date64_array_interval_op(
+ *left,
+ right,
+ NaiveDate::checked_add_days,
+ NaiveDate::checked_add_months,
+ )
+ } else {
+ Err(DataFusionError::Internal(
+ "Date64 value is None".to_string(),
+ ))
+ }
+ }
+ // Interval + Interval
(DataType::Interval(..), DataType::Interval(..)) => {
- interval_scalar_interval_op(left, 1, right)
+ scalar_interval_op_interval(left, 1, right)
}
+ // Timestamp + Interval
(DataType::Timestamp(..), DataType::Interval(..)) => {
- ts_scalar_interval_op(left, 1, right)
+ scalar_ts_op_interval(left, 1, right)
}
_ => {
// fall back to kernels in arrow-rs
- Ok(ColumnarValue::Array(add_dyn(left, &right.to_array())?))
+ Ok(add_dyn(&left.to_array(), right)?)
}
}
}
@@ -398,41 +468,152 @@ pub(crate) fn subtract_dyn_temporal(
}
}
-pub(crate) fn subtract_dyn_temporal_scalar(
+pub(crate) fn subtract_dyn_temporal_right_scalar(
left: &ArrayRef,
right: &ScalarValue,
-) -> Result<ColumnarValue> {
+) -> Result<ArrayRef> {
match (left.data_type(), right.get_datatype()) {
+ // Date32 - Interval
(DataType::Date32, DataType::Interval(..)) => {
let left = as_date32_array(&left)?;
let ret = Arc::new(try_unary::<Date32Type, _, Date32Type>(left, |days| {
- Ok(date32_add(days, right, -1)?)
- })?) as ArrayRef;
- Ok(ColumnarValue::Array(ret))
+ Ok(date32_op(days, right, -1)?)
+ })?) as _;
+ Ok(ret)
}
+ // Date64 - Interval
(DataType::Date64, DataType::Interval(..)) => {
let left = as_date64_array(&left)?;
let ret = Arc::new(try_unary::<Date64Type, _, Date64Type>(left, |ms| {
- Ok(date64_add(ms, right, -1)?)
- })?) as ArrayRef;
- Ok(ColumnarValue::Array(ret))
+ Ok(date64_op(ms, right, -1)?)
+ })?) as _;
+ Ok(ret)
}
+ // Timestamp - Timestamp
(DataType::Timestamp(..), DataType::Timestamp(..)) => {
- ts_scalar_ts_op(left, right)
+ ts_sub_scalar_ts(left, right)
}
+ // Interval - Interval
(DataType::Interval(..), DataType::Interval(..)) => {
- interval_scalar_interval_op(left, -1, right)
+ interval_op_scalar_interval(left, -1, right)
}
+ // Timestamp - Interval
(DataType::Timestamp(..), DataType::Interval(..)) => {
- ts_scalar_interval_op(left, -1, right)
+ ts_op_scalar_interval(left, -1, right)
}
_ => {
// fall back to kernels in arrow-rs
- Ok(ColumnarValue::Array(subtract_dyn(left, &right.to_array())?))
+ Ok(subtract_dyn(left, &right.to_array())?)
}
}
}
+pub(crate) fn subtract_dyn_temporal_left_scalar(
+ left: &ScalarValue,
+ right: &ArrayRef,
+) -> Result<ArrayRef> {
+ match (left.get_datatype(), right.data_type()) {
+ // Date32 - Interval
+ (DataType::Date32, DataType::Interval(..)) => {
+ if let ScalarValue::Date32(Some(left)) = left {
+ scalar_date32_array_interval_op(
+ *left,
+ right,
+ NaiveDate::checked_sub_days,
+ NaiveDate::checked_sub_months,
+ )
+ } else {
+ Err(DataFusionError::Internal(
+ "Date32 value is None".to_string(),
+ ))
+ }
+ }
+ // Date64 - Interval
+ (DataType::Date64, DataType::Interval(..)) => {
+ if let ScalarValue::Date64(Some(left)) = left {
+ scalar_date64_array_interval_op(
+ *left,
+ right,
+ NaiveDate::checked_sub_days,
+ NaiveDate::checked_sub_months,
+ )
+ } else {
+ Err(DataFusionError::Internal(
+ "Date64 value is None".to_string(),
+ ))
+ }
+ }
+ // Timestamp - Timestamp
+ (DataType::Timestamp(..), DataType::Timestamp(..)) => {
+ scalar_ts_sub_ts(left, right)
+ }
+ // Interval - Interval
+ (DataType::Interval(..), DataType::Interval(..)) => {
+ scalar_interval_op_interval(left, -1, right)
+ }
+ // Timestamp - Interval
+ (DataType::Timestamp(..), DataType::Interval(..)) => {
+ scalar_ts_op_interval(left, -1, right)
+ }
+ _ => {
+ // fall back to kernels in arrow-rs
+ Ok(subtract_dyn(&left.to_array(), right)?)
+ }
+ }
+}
+
+fn scalar_date32_array_interval_op(
+ left: i32,
+ right: &ArrayRef,
+ day_op: fn(NaiveDate, Days) -> Option<NaiveDate>,
+ month_op: fn(NaiveDate, Months) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+ let epoch = NaiveDate::from_ymd_opt(1970, 1, 1)
+ .ok_or_else(|| DataFusionError::Execution("Invalid Date entered".to_string()))?;
+ let prior = epoch.add(Duration::days(left as i64));
+ match right.data_type() {
+ DataType::Interval(IntervalUnit::YearMonth) => {
+ date32_interval_ym_op(right, &epoch, &prior, month_op)
+ }
+ DataType::Interval(IntervalUnit::DayTime) => {
+ date32_interval_dt_op(right, &epoch, &prior, day_op)
+ }
+ DataType::Interval(IntervalUnit::MonthDayNano) => {
+ date32_interval_mdn_op(right, &epoch, &prior, day_op, month_op)
+ }
+ _ => Err(DataFusionError::Internal(format!(
+ "Expected type is an interval, but {} is found",
+ right.data_type()
+ ))),
+ }
+}
+
+fn scalar_date64_array_interval_op(
+ left: i64,
+ right: &ArrayRef,
+ day_op: fn(NaiveDate, Days) -> Option<NaiveDate>,
+ month_op: fn(NaiveDate, Months) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+ let epoch = NaiveDate::from_ymd_opt(1970, 1, 1)
+ .ok_or_else(|| DataFusionError::Execution("Invalid Date entered".to_string()))?;
+ let prior = epoch.add(Duration::milliseconds(left));
+ match right.data_type() {
+ DataType::Interval(IntervalUnit::YearMonth) => {
+ date64_interval_ym_op(right, &epoch, &prior, month_op)
+ }
+ DataType::Interval(IntervalUnit::DayTime) => {
+ date64_interval_dt_op(right, &epoch, &prior, day_op)
+ }
+ DataType::Interval(IntervalUnit::MonthDayNano) => {
+ date64_interval_mdn_op(right, &epoch, &prior, day_op, month_op)
+ }
+ _ => Err(DataFusionError::Internal(format!(
+ "Expected type is an interval, but {} is found",
+ right.data_type()
+ ))),
+ }
+}
+
fn get_precision_scale(data_type: &DataType) -> Result<(u8, i8)> {
match data_type {
DataType::Decimal128(precision, scale) => Ok((*precision, *scale)),
@@ -704,6 +885,1297 @@ pub(crate) fn modulus_decimal_dyn_scalar(
decimal_array_with_precision_scale(array, precision, scale)
}
+macro_rules! sub_timestamp_macro {
+ ($array:expr, $rhs:expr, $caster:expr, $interval_type:ty, $opt_tz_lhs:expr, $multiplier:expr,
+ $opt_tz_rhs:expr, $unit_sub:expr, $naive_sub_fn:expr, $counter:expr) => {{
+ let prim_array = $caster(&$array)?;
+ let ret: PrimitiveArray<$interval_type> = try_unary(prim_array, |lhs| {
+ let (parsed_lhs_tz, parsed_rhs_tz) =
+ (parse_timezones($opt_tz_lhs)?, parse_timezones($opt_tz_rhs)?);
+ let (naive_lhs, naive_rhs) = calculate_naives::<$unit_sub>(
+ lhs.mul_wrapping($multiplier),
+ parsed_lhs_tz,
+ $rhs.mul_wrapping($multiplier),
+ parsed_rhs_tz,
+ )?;
+ Ok($naive_sub_fn($counter(&naive_lhs), $counter(&naive_rhs)))
+ })?;
+ Arc::new(ret) as _
+ }};
+}
+
+macro_rules! sub_timestamp_left_scalar_macro {
+ ($array:expr, $lhs:expr, $caster:expr, $interval_type:ty, $opt_tz_lhs:expr, $multiplier:expr,
+ $opt_tz_rhs:expr, $unit_sub:expr, $naive_sub_fn:expr, $counter:expr) => {{
+ let prim_array = $caster(&$array)?;
+ let ret: PrimitiveArray<$interval_type> = try_unary(prim_array, |rhs| {
+ let (parsed_lhs_tz, parsed_rhs_tz) =
+ (parse_timezones($opt_tz_lhs)?, parse_timezones($opt_tz_rhs)?);
+ let (naive_lhs, naive_rhs) = calculate_naives::<$unit_sub>(
+ $lhs.mul_wrapping($multiplier),
+ parsed_lhs_tz,
+ rhs.mul_wrapping($multiplier),
+ parsed_rhs_tz,
+ )?;
+ Ok($naive_sub_fn($counter(&naive_lhs), $counter(&naive_rhs)))
+ })?;
+ Arc::new(ret) as _
+ }};
+}
+
+macro_rules! op_timestamp_interval_macro {
+ ($array:expr, $as_timestamp:expr, $ts_type:ty, $fn_op:expr, $scalar:expr, $sign:expr, $tz:expr) => {{
+ let array = $as_timestamp(&$array)?;
+ let ret: PrimitiveArray<$ts_type> =
+ try_unary::<$ts_type, _, $ts_type>(array, |ts_s| {
+ Ok($fn_op(ts_s, $scalar, $sign)?)
+ })?;
+ Arc::new(ret.with_timezone_opt($tz.clone())) as _
+ }};
+}
+
+macro_rules! scalar_ts_op_interval_macro {
+ ($ts:ident, $tz:ident, $interval:ident, $sign:ident,
+ $caster1:expr, $type1:ty, $type2:ty, $op:expr, $back_caster:expr) => {{
+ let interval = $caster1(&$interval)?;
+ let ret: PrimitiveArray<$type1> =
+ try_unary::<$type2, _, $type1>(interval, |e| {
+ let prior = $ts.ok_or_else(|| {
+ DataFusionError::Internal("Timestamp is out-of-range".to_string())
+ })?;
+ Ok($back_caster(&$op(prior, e, $sign)))
+ })?;
+ Arc::new(ret.with_timezone_opt($tz.clone())) as _
+ }};
+}
+
+macro_rules! op_interval_macro {
+ ($array:expr, $as_interval:expr, $interval_type:ty, $fn_op:expr, $scalar:expr, $sign:expr) => {{
+ let array = $as_interval(&$array)?;
+ let ret: PrimitiveArray<$interval_type> =
+ unary(array, |lhs| $fn_op(lhs, *$scalar, $sign));
+ Arc::new(ret) as _
+ }};
+}
+
+macro_rules! op_interval_cross_macro {
+ ($array:expr, $as_interval:expr, $commute:expr, $fn_op:expr, $scalar:expr, $sign:expr, $t1:ty, $t2:ty) => {{
+ let array = $as_interval(&$array)?;
+ let ret: PrimitiveArray<IntervalMonthDayNanoType> = if $commute {
+ unary(array, |lhs| {
+ $fn_op(*$scalar as $t1, lhs as $t2, $sign, $commute)
+ })
+ } else {
+ unary(array, |lhs| {
+ $fn_op(lhs as $t1, *$scalar as $t2, $sign, $commute)
+ })
+ };
+ Arc::new(ret) as _
+ }};
+}
+
+macro_rules! ts_sub_op {
+ ($lhs:ident, $rhs:ident, $lhs_tz:ident, $rhs_tz:ident, $coef:expr, $caster:expr, $op:expr, $ts_unit:expr, $mode:expr, $type_out:ty) => {{
+ let prim_array_lhs = $caster(&$lhs)?;
+ let prim_array_rhs = $caster(&$rhs)?;
+ let ret: PrimitiveArray<$type_out> =
+ arrow::compute::try_binary(prim_array_lhs, prim_array_rhs, |ts1, ts2| {
+ let (parsed_lhs_tz, parsed_rhs_tz) = (
+ parse_timezones($lhs_tz.as_deref())?,
+ parse_timezones($rhs_tz.as_deref())?,
+ );
+ let (naive_lhs, naive_rhs) = calculate_naives::<$mode>(
+ ts1.mul_wrapping($coef),
+ parsed_lhs_tz,
+ ts2.mul_wrapping($coef),
+ parsed_rhs_tz,
+ )?;
+ Ok($op($ts_unit(&naive_lhs), $ts_unit(&naive_rhs)))
+ })?;
+ Arc::new(ret) as _
+ }};
+}
+
+macro_rules! interval_op {
+ ($lhs:ident, $rhs:ident, $caster:expr, $op:expr, $sign:ident, $type_in:ty) => {{
+ let prim_array_lhs = $caster(&$lhs)?;
+ let prim_array_rhs = $caster(&$rhs)?;
+ Arc::new(arrow::compute::binary::<$type_in, $type_in, _, $type_in>(
+ prim_array_lhs,
+ prim_array_rhs,
+ |interval1, interval2| $op(interval1, interval2, $sign),
+ )?) as _
+ }};
+}
+
+macro_rules! interval_cross_op {
+ ($lhs:ident, $rhs:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $commute:ident, $type_in1:ty, $type_in2:ty) => {{
+ let prim_array_lhs = $caster1(&$lhs)?;
+ let prim_array_rhs = $caster2(&$rhs)?;
+ Arc::new(arrow::compute::binary::<
+ $type_in1,
+ $type_in2,
+ _,
+ IntervalMonthDayNanoType,
+ >(
+ prim_array_lhs,
+ prim_array_rhs,
+ |interval1, interval2| $op(interval1, interval2, $sign, $commute),
+ )?) as _
+ }};
+}
+
+macro_rules! ts_interval_op {
+ ($lhs:ident, $rhs:ident, $tz:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $type_in1:ty, $type_in2:ty) => {{
+ let prim_array_lhs = $caster1(&$lhs)?;
+ let prim_array_rhs = $caster2(&$rhs)?;
+ let ret: PrimitiveArray<$type_in1> = arrow::compute::try_binary(
+ prim_array_lhs,
+ prim_array_rhs,
+ |ts, interval| Ok($op(ts, interval as i128, $sign)?),
+ )?;
+ Arc::new(ret.with_timezone_opt($tz.clone())) as _
+ }};
+}
+
+/// This function handles timestamp - timestamp operations where the former is
+/// an array and the latter is a scalar, resulting in an array.
+pub fn ts_sub_scalar_ts(array: &ArrayRef, scalar: &ScalarValue) -> Result<ArrayRef> {
+ let ret = match (array.data_type(), scalar) {
+ (
+ DataType::Timestamp(TimeUnit::Second, opt_tz_lhs),
+ ScalarValue::TimestampSecond(Some(rhs), opt_tz_rhs),
+ ) => {
+ sub_timestamp_macro!(
+ array,
+ rhs,
+ as_timestamp_second_array,
+ IntervalDayTimeType,
+ opt_tz_lhs.as_deref(),
+ 1000,
+ opt_tz_rhs.as_deref(),
+ MILLISECOND_MODE,
+ seconds_sub,
+ NaiveDateTime::timestamp
+ )
+ }
+ (
+ DataType::Timestamp(TimeUnit::Millisecond, opt_tz_lhs),
+ ScalarValue::TimestampMillisecond(Some(rhs), opt_tz_rhs),
+ ) => {
+ sub_timestamp_macro!(
+ array,
+ rhs,
+ as_timestamp_millisecond_array,
+ IntervalDayTimeType,
+ opt_tz_lhs.as_deref(),
+ 1,
+ opt_tz_rhs.as_deref(),
+ MILLISECOND_MODE,
+ milliseconds_sub,
+ NaiveDateTime::timestamp_millis
+ )
+ }
+ (
+ DataType::Timestamp(TimeUnit::Microsecond, opt_tz_lhs),
+ ScalarValue::TimestampMicrosecond(Some(rhs), opt_tz_rhs),
+ ) => {
+ sub_timestamp_macro!(
+ array,
+ rhs,
+ as_timestamp_microsecond_array,
+ IntervalMonthDayNanoType,
+ opt_tz_lhs.as_deref(),
+ 1000,
+ opt_tz_rhs.as_deref(),
+ NANOSECOND_MODE,
+ microseconds_sub,
+ NaiveDateTime::timestamp_micros
+ )
+ }
+ (
+ DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_lhs),
+ ScalarValue::TimestampNanosecond(Some(rhs), opt_tz_rhs),
+ ) => {
+ sub_timestamp_macro!(
+ array,
+ rhs,
+ as_timestamp_nanosecond_array,
+ IntervalMonthDayNanoType,
+ opt_tz_lhs.as_deref(),
+ 1,
+ opt_tz_rhs.as_deref(),
+ NANOSECOND_MODE,
+ nanoseconds_sub,
+ NaiveDateTime::timestamp_nanos
+ )
+ }
+ (_, _) => {
+ return Err(DataFusionError::Internal(format!(
+ "Invalid array - scalar types for Timestamp subtraction: {:?} - {:?}",
+ array.data_type(),
+ scalar.get_datatype()
+ )));
+ }
+ };
+ Ok(ret)
+}
+
+/// This function handles timestamp - timestamp operations where the former is
+/// a scalar and the latter is an array, resulting in an array.
+pub fn scalar_ts_sub_ts(scalar: &ScalarValue, array: &ArrayRef) -> Result<ArrayRef> {
+ let ret = match (scalar, array.data_type()) {
+ (
+ ScalarValue::TimestampSecond(Some(lhs), opt_tz_lhs),
+ DataType::Timestamp(TimeUnit::Second, opt_tz_rhs),
+ ) => {
+ sub_timestamp_left_scalar_macro!(
+ array,
+ lhs,
+ as_timestamp_second_array,
+ IntervalDayTimeType,
+ opt_tz_lhs.as_deref(),
+ 1000,
+ opt_tz_rhs.as_deref(),
+ MILLISECOND_MODE,
+ seconds_sub,
+ NaiveDateTime::timestamp
+ )
+ }
+ (
+ ScalarValue::TimestampMillisecond(Some(lhs), opt_tz_lhs),
+ DataType::Timestamp(TimeUnit::Millisecond, opt_tz_rhs),
+ ) => {
+ sub_timestamp_left_scalar_macro!(
+ array,
+ lhs,
+ as_timestamp_millisecond_array,
+ IntervalDayTimeType,
+ opt_tz_lhs.as_deref(),
+ 1,
+ opt_tz_rhs.as_deref(),
+ MILLISECOND_MODE,
+ milliseconds_sub,
+ NaiveDateTime::timestamp_millis
+ )
+ }
+ (
+ ScalarValue::TimestampMicrosecond(Some(lhs), opt_tz_lhs),
+ DataType::Timestamp(TimeUnit::Microsecond, opt_tz_rhs),
+ ) => {
+ sub_timestamp_left_scalar_macro!(
+ array,
+ lhs,
+ as_timestamp_microsecond_array,
+ IntervalMonthDayNanoType,
+ opt_tz_lhs.as_deref(),
+ 1000,
+ opt_tz_rhs.as_deref(),
+ NANOSECOND_MODE,
+ microseconds_sub,
+ NaiveDateTime::timestamp_micros
+ )
+ }
+ (
+ ScalarValue::TimestampNanosecond(Some(lhs), opt_tz_lhs),
+ DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_rhs),
+ ) => {
+ sub_timestamp_left_scalar_macro!(
+ array,
+ lhs,
+ as_timestamp_nanosecond_array,
+ IntervalMonthDayNanoType,
+ opt_tz_lhs.as_deref(),
+ 1,
+ opt_tz_rhs.as_deref(),
+ NANOSECOND_MODE,
+ nanoseconds_sub,
+ NaiveDateTime::timestamp_nanos
+ )
+ }
+ (_, _) => {
+ return Err(DataFusionError::Internal(format!(
+ "Invalid scalar - array types for Timestamp subtraction: {:?} - {:?}",
+ scalar.get_datatype(),
+ array.data_type()
+ )));
+ }
+ };
+ Ok(ret)
+}
+
+/// This function handles timestamp +/- interval operations where the former is
+/// an array and the latter is a scalar, resulting in an array.
+pub fn ts_op_scalar_interval(
+ array: &ArrayRef,
+ sign: i32,
+ scalar: &ScalarValue,
+) -> Result<ArrayRef> {
+ let ret = match array.data_type() {
+ DataType::Timestamp(TimeUnit::Second, tz) => {
+ op_timestamp_interval_macro!(
+ array,
+ as_timestamp_second_array,
+ TimestampSecondType,
+ seconds_add,
+ scalar,
+ sign,
+ tz
+ )
+ }
+ DataType::Timestamp(TimeUnit::Millisecond, tz) => {
+ op_timestamp_interval_macro!(
+ array,
+ as_timestamp_millisecond_array,
+ TimestampMillisecondType,
+ milliseconds_add,
+ scalar,
+ sign,
+ tz
+ )
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, tz) => {
+ op_timestamp_interval_macro!(
+ array,
+ as_timestamp_microsecond_array,
+ TimestampMicrosecondType,
+ microseconds_add,
+ scalar,
+ sign,
+ tz
+ )
+ }
+ DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
+ op_timestamp_interval_macro!(
+ array,
+ as_timestamp_nanosecond_array,
+ TimestampNanosecondType,
+ nanoseconds_add,
+ scalar,
+ sign,
+ tz
+ )
+ }
+ _ => Err(DataFusionError::Internal(format!(
+ "Invalid lhs type for Timestamp vs Interval operations: {}",
+ array.data_type()
+ )))?,
+ };
+ Ok(ret)
+}
+
+/// This function handles timestamp +/- interval operations where the former is
+/// a scalar and the latter is an array, resulting in an array.
+pub fn scalar_ts_op_interval(
+ scalar: &ScalarValue,
+ sign: i32,
+ array: &ArrayRef,
+) -> Result<ArrayRef> {
+ use DataType::*;
+ use IntervalUnit::*;
+ use ScalarValue::*;
+ let ret = match (scalar, array.data_type()) {
+ // Second op YearMonth
+ (TimestampSecond(Some(ts_sec), tz), Interval(YearMonth)) => {
+ let naive_date = NaiveDateTime::from_timestamp_opt(*ts_sec, 0);
+ scalar_ts_op_interval_macro!(
+ naive_date,
+ tz,
+ array,
+ sign,
+ as_interval_ym_array,
+ TimestampSecondType,
+ IntervalYearMonthType,
+ shift_months,
+ NaiveDateTime::timestamp
+ )
+ }
+ // Millisecond op YearMonth
+ (TimestampMillisecond(Some(ts_ms), tz), Interval(YearMonth)) => {
+ let naive_date = NaiveDateTime::from_timestamp_millis(*ts_ms);
+ scalar_ts_op_interval_macro!(
+ naive_date,
+ tz,
+ array,
+ sign,
+ as_interval_ym_array,
+ TimestampSecondType,
+ IntervalYearMonthType,
+ shift_months,
+ NaiveDateTime::timestamp
+ )
+ }
+ // Microsecond op YearMonth
+ (TimestampMicrosecond(Some(ts_us), tz), Interval(YearMonth)) => {
+ let naive_date = NaiveDateTime::from_timestamp_micros(*ts_us);
+ scalar_ts_op_interval_macro!(
+ naive_date,
+ tz,
+ array,
+ sign,
+ as_interval_ym_array,
+ TimestampSecondType,
+ IntervalYearMonthType,
+ shift_months,
+ NaiveDateTime::timestamp
+ )
+ }
+ // Nanosecond op YearMonth
+ (TimestampNanosecond(Some(ts_ns), tz), Interval(YearMonth)) => {
+ let naive_date = NaiveDateTime::from_timestamp_opt(
+ ts_ns.div_euclid(1_000_000_000),
+ ts_ns.rem_euclid(1_000_000_000).try_into().map_err(|_| {
+ DataFusionError::Internal("Overflow of divison".to_string())
+ })?,
+ );
+ scalar_ts_op_interval_macro!(
+ naive_date,
+ tz,
+ array,
+ sign,
+ as_interval_ym_array,
+ TimestampSecondType,
+ IntervalYearMonthType,
+ shift_months,
+ NaiveDateTime::timestamp
+ )
+ }
+ // Second op DayTime
+ (TimestampSecond(Some(ts_sec), tz), Interval(DayTime)) => {
+ let naive_date = NaiveDateTime::from_timestamp_opt(*ts_sec, 0);
+ scalar_ts_op_interval_macro!(
+ naive_date,
+ tz,
+ array,
+ sign,
+ as_interval_dt_array,
+ TimestampSecondType,
+ IntervalDayTimeType,
+ add_day_time,
+ NaiveDateTime::timestamp
+ )
+ }
+ // Millisecond op DayTime
+ (TimestampMillisecond(Some(ts_ms), tz), Interval(DayTime)) => {
+ let naive_date = NaiveDateTime::from_timestamp_millis(*ts_ms);
+ scalar_ts_op_interval_macro!(
+ naive_date,
+ tz,
+ array,
+ sign,
+ as_interval_dt_array,
+ TimestampMillisecondType,
+ IntervalDayTimeType,
+ add_day_time,
+ NaiveDateTime::timestamp_millis
+ )
+ }
+ // Microsecond op DayTime
+ (TimestampMicrosecond(Some(ts_us), tz), Interval(DayTime)) => {
+ let naive_date = NaiveDateTime::from_timestamp_micros(*ts_us);
+ scalar_ts_op_interval_macro!(
+ naive_date,
+ tz,
+ array,
+ sign,
+ as_interval_dt_array,
+ TimestampMicrosecondType,
+ IntervalDayTimeType,
+ add_day_time,
+ NaiveDateTime::timestamp_micros
+ )
+ }
+ // Nanosecond op DayTime
+ (TimestampNanosecond(Some(ts_ns), tz), Interval(DayTime)) => {
+ let naive_date = NaiveDateTime::from_timestamp_opt(
+ ts_ns.div_euclid(1_000_000_000),
+ ts_ns.rem_euclid(1_000_000_000).try_into().map_err(|_| {
+ DataFusionError::Internal("Overflow of divison".to_string())
+ })?,
+ );
+ scalar_ts_op_interval_macro!(
+ naive_date,
+ tz,
+ array,
+ sign,
+ as_interval_dt_array,
+ TimestampNanosecondType,
+ IntervalDayTimeType,
+ add_day_time,
+ NaiveDateTime::timestamp_nanos
+ )
+ }
+ // Second op MonthDayNano
+ (TimestampSecond(Some(ts_sec), tz), Interval(MonthDayNano)) => {
+ let naive_date = NaiveDateTime::from_timestamp_opt(*ts_sec, 0);
+ scalar_ts_op_interval_macro!(
+ naive_date,
+ tz,
+ array,
+ sign,
+ as_interval_mdn_array,
+ TimestampSecondType,
+ IntervalMonthDayNanoType,
+ add_m_d_nano,
+ NaiveDateTime::timestamp
+ )
+ }
+ // Millisecond op MonthDayNano
+ (TimestampMillisecond(Some(ts_ms), tz), Interval(MonthDayNano)) => {
+ let naive_date = NaiveDateTime::from_timestamp_millis(*ts_ms);
+ scalar_ts_op_interval_macro!(
+ naive_date,
+ tz,
+ array,
+ sign,
+ as_interval_mdn_array,
+ TimestampMillisecondType,
+ IntervalMonthDayNanoType,
+ add_m_d_nano,
+ NaiveDateTime::timestamp_millis
+ )
+ }
+ // Microsecond op MonthDayNano
+ (TimestampMicrosecond(Some(ts_us), tz), Interval(MonthDayNano)) => {
+ let naive_date = NaiveDateTime::from_timestamp_micros(*ts_us);
+ scalar_ts_op_interval_macro!(
+ naive_date,
+ tz,
+ array,
+ sign,
+ as_interval_mdn_array,
+ TimestampMicrosecondType,
+ IntervalMonthDayNanoType,
+ add_m_d_nano,
+ NaiveDateTime::timestamp_micros
+ )
+ }
+
+ // Nanosecond op MonthDayNano
+ (TimestampNanosecond(Some(ts_ns), tz), Interval(MonthDayNano)) => {
+ let naive_date = NaiveDateTime::from_timestamp_opt(
+ ts_ns.div_euclid(1_000_000_000),
+ ts_ns.rem_euclid(1_000_000_000).try_into().map_err(|_| {
+ DataFusionError::Internal("Overflow of divison".to_string())
+ })?,
+ );
+ scalar_ts_op_interval_macro!(
+ naive_date,
+ tz,
+ array,
+ sign,
+ as_interval_mdn_array,
+ TimestampNanosecondType,
+ IntervalMonthDayNanoType,
+ add_m_d_nano,
+ NaiveDateTime::timestamp_nanos
+ )
+ }
+ _ => Err(DataFusionError::Internal(
+ "Invalid types for Timestamp vs Interval operations".to_string(),
+ ))?,
+ };
+ Ok(ret)
+}
+
+/// This function handles interval +/- interval operations where the former is
+/// an array and the latter is a scalar, resulting in an interval array.
+pub fn interval_op_scalar_interval(
+ array: &ArrayRef,
+ sign: i32,
+ scalar: &ScalarValue,
+) -> Result<ArrayRef> {
+ use DataType::*;
+ use IntervalUnit::*;
+ use ScalarValue::*;
+ let ret = match (array.data_type(), scalar) {
+ (Interval(YearMonth), IntervalYearMonth(Some(rhs))) => {
+ op_interval_macro!(
+ array,
+ as_interval_ym_array,
+ IntervalYearMonthType,
+ op_ym,
+ rhs,
+ sign
+ )
+ }
+ (Interval(YearMonth), IntervalDayTime(Some(rhs))) => {
+ op_interval_cross_macro!(
+ array,
+ as_interval_ym_array,
+ false,
+ op_ym_dt,
+ rhs,
+ sign,
+ i32,
+ i64
+ )
+ }
+ (Interval(YearMonth), IntervalMonthDayNano(Some(rhs))) => {
+ op_interval_cross_macro!(
+ array,
+ as_interval_ym_array,
+ false,
+ op_ym_mdn,
+ rhs,
+ sign,
+ i32,
+ i128
+ )
+ }
+ (Interval(DayTime), IntervalYearMonth(Some(rhs))) => {
+ op_interval_cross_macro!(
+ array,
+ as_interval_dt_array,
+ true,
+ op_ym_dt,
+ rhs,
+ sign,
+ i32,
+ i64
+ )
+ }
+ (Interval(DayTime), IntervalDayTime(Some(rhs))) => {
+ op_interval_macro!(
+ array,
+ as_interval_dt_array,
+ IntervalDayTimeType,
+ op_dt,
+ rhs,
+ sign
+ )
+ }
+ (Interval(DayTime), IntervalMonthDayNano(Some(rhs))) => {
+ op_interval_cross_macro!(
+ array,
+ as_interval_dt_array,
+ false,
+ op_dt_mdn,
+ rhs,
+ sign,
+ i64,
+ i128
+ )
+ }
+ (Interval(MonthDayNano), IntervalYearMonth(Some(rhs))) => {
+ op_interval_cross_macro!(
+ array,
+ as_interval_mdn_array,
+ true,
+ op_ym_mdn,
+ rhs,
+ sign,
+ i32,
+ i128
+ )
+ }
+ (Interval(MonthDayNano), IntervalDayTime(Some(rhs))) => {
+ op_interval_cross_macro!(
+ array,
+ as_interval_mdn_array,
+ true,
+ op_dt_mdn,
+ rhs,
+ sign,
+ i64,
+ i128
+ )
+ }
+ (Interval(MonthDayNano), IntervalMonthDayNano(Some(rhs))) => {
+ op_interval_macro!(
+ array,
+ as_interval_mdn_array,
+ IntervalMonthDayNanoType,
+ op_mdn,
+ rhs,
+ sign
+ )
+ }
+ _ => Err(DataFusionError::Internal(format!(
+ "Invalid operands for Interval vs Interval operations: {} - {}",
+ array.data_type(),
+ scalar.get_datatype(),
+ )))?,
+ };
+ Ok(ret)
+}
+
+/// This function handles interval +/- interval operations where the former is
+/// a scalar and the latter is an array, resulting in an interval array.
+pub fn scalar_interval_op_interval(
+ scalar: &ScalarValue,
+ sign: i32,
+ array: &ArrayRef,
+) -> Result<ArrayRef> {
+ use DataType::*;
+ use IntervalUnit::*;
+ use ScalarValue::*;
+ let ret = match (scalar, array.data_type()) {
+ // YearMonth op YearMonth
+ (IntervalYearMonth(Some(lhs)), Interval(YearMonth)) => {
+ let array = as_interval_ym_array(&array)?;
+ let ret: PrimitiveArray<IntervalYearMonthType> =
+ unary(array, |rhs| op_ym(*lhs, rhs, sign));
+ Arc::new(ret) as _
+ }
+ // DayTime op YearMonth
+ (IntervalDayTime(Some(lhs)), Interval(YearMonth)) => {
+ let array = as_interval_ym_array(&array)?;
+ let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+ unary(array, |rhs| op_ym_dt(rhs, *lhs, sign, true));
+ Arc::new(ret) as _
+ }
+ // MonthDayNano op YearMonth
+ (IntervalMonthDayNano(Some(lhs)), Interval(YearMonth)) => {
+ let array = as_interval_ym_array(&array)?;
+ let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+ unary(array, |rhs| op_ym_mdn(rhs, *lhs, sign, true));
+ Arc::new(ret) as _
+ }
+ // YearMonth op DayTime
+ (IntervalYearMonth(Some(lhs)), Interval(DayTime)) => {
+ let array = as_interval_dt_array(&array)?;
+ let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+ unary(array, |rhs| op_ym_dt(*lhs, rhs, sign, false));
+ Arc::new(ret) as _
+ }
+ // DayTime op DayTime
+ (IntervalDayTime(Some(lhs)), Interval(DayTime)) => {
+ let array = as_interval_dt_array(&array)?;
+ let ret: PrimitiveArray<IntervalDayTimeType> =
+ unary(array, |rhs| op_dt(*lhs, rhs, sign));
+ Arc::new(ret) as _
+ }
+ // MonthDayNano op DayTime
+ (IntervalMonthDayNano(Some(lhs)), Interval(DayTime)) => {
+ let array = as_interval_dt_array(&array)?;
+ let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+ unary(array, |rhs| op_dt_mdn(rhs, *lhs, sign, true));
+ Arc::new(ret) as _
+ }
+ // YearMonth op MonthDayNano
+ (IntervalYearMonth(Some(lhs)), Interval(MonthDayNano)) => {
+ let array = as_interval_mdn_array(&array)?;
+ let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+ unary(array, |rhs| op_ym_mdn(*lhs, rhs, sign, false));
+ Arc::new(ret) as _
+ }
+ // DayTime op MonthDayNano
+ (IntervalDayTime(Some(lhs)), Interval(MonthDayNano)) => {
+ let array = as_interval_mdn_array(&array)?;
+ let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+ unary(array, |rhs| op_dt_mdn(*lhs, rhs, sign, false));
+ Arc::new(ret) as _
+ }
+ // MonthDayNano op MonthDayNano
+ (IntervalMonthDayNano(Some(lhs)), Interval(MonthDayNano)) => {
+ let array = as_interval_mdn_array(&array)?;
+ let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+ unary(array, |rhs| op_mdn(*lhs, rhs, sign));
+ Arc::new(ret) as _
+ }
+ _ => Err(DataFusionError::Internal(format!(
+ "Invalid operands for Interval vs Interval operations: {} - {}",
+ scalar.get_datatype(),
+ array.data_type(),
+ )))?,
+ };
+ Ok(ret)
+}
+
+/// Performs a timestamp subtraction operation on two arrays and returns the resulting array.
+pub fn ts_array_op(array_lhs: &ArrayRef, array_rhs: &ArrayRef) -> Result<ArrayRef> {
+ use DataType::*;
+ use TimeUnit::*;
+ match (array_lhs.data_type(), array_rhs.data_type()) {
+ (Timestamp(Second, opt_tz_lhs), Timestamp(Second, opt_tz_rhs)) => Ok(ts_sub_op!(
+ array_lhs,
+ array_rhs,
+ opt_tz_lhs,
+ opt_tz_rhs,
+ 1000i64,
+ as_timestamp_second_array,
+ seconds_sub,
+ NaiveDateTime::timestamp,
+ MILLISECOND_MODE,
+ IntervalDayTimeType
+ )),
+ (Timestamp(Millisecond, opt_tz_lhs), Timestamp(Millisecond, opt_tz_rhs)) => {
+ Ok(ts_sub_op!(
+ array_lhs,
+ array_rhs,
+ opt_tz_lhs,
+ opt_tz_rhs,
+ 1i64,
+ as_timestamp_millisecond_array,
+ milliseconds_sub,
+ NaiveDateTime::timestamp_millis,
+ MILLISECOND_MODE,
+ IntervalDayTimeType
+ ))
+ }
+ (Timestamp(Microsecond, opt_tz_lhs), Timestamp(Microsecond, opt_tz_rhs)) => {
+ Ok(ts_sub_op!(
+ array_lhs,
+ array_rhs,
+ opt_tz_lhs,
+ opt_tz_rhs,
+ 1000i64,
+ as_timestamp_microsecond_array,
+ microseconds_sub,
+ NaiveDateTime::timestamp_micros,
+ NANOSECOND_MODE,
+ IntervalMonthDayNanoType
+ ))
+ }
+ (Timestamp(Nanosecond, opt_tz_lhs), Timestamp(Nanosecond, opt_tz_rhs)) => {
+ Ok(ts_sub_op!(
+ array_lhs,
+ array_rhs,
+ opt_tz_lhs,
+ opt_tz_rhs,
+ 1i64,
+ as_timestamp_nanosecond_array,
+ nanoseconds_sub,
+ NaiveDateTime::timestamp_nanos,
+ NANOSECOND_MODE,
+ IntervalMonthDayNanoType
+ ))
+ }
+ (_, _) => Err(DataFusionError::Execution(format!(
+ "Invalid array types for Timestamp subtraction: {} - {}",
+ array_lhs.data_type(),
+ array_rhs.data_type()
+ ))),
+ }
+}
+/// Performs an interval operation on two arrays and returns the resulting array.
+/// The operation sign determines whether to perform addition or subtraction.
+/// The data type and unit of the two input arrays must match the supported combinations.
+pub fn interval_array_op(
+ array_lhs: &ArrayRef,
+ array_rhs: &ArrayRef,
+ sign: i32,
+) -> Result<ArrayRef> {
+ use DataType::*;
+ use IntervalUnit::*;
+ match (array_lhs.data_type(), array_rhs.data_type()) {
+ (Interval(YearMonth), Interval(YearMonth)) => Ok(interval_op!(
+ array_lhs,
+ array_rhs,
+ as_interval_ym_array,
+ op_ym,
+ sign,
+ IntervalYearMonthType
+ )),
+ (Interval(YearMonth), Interval(DayTime)) => Ok(interval_cross_op!(
+ array_lhs,
+ array_rhs,
+ as_interval_ym_array,
+ as_interval_dt_array,
+ op_ym_dt,
+ sign,
+ false,
+ IntervalYearMonthType,
+ IntervalDayTimeType
+ )),
+ (Interval(YearMonth), Interval(MonthDayNano)) => Ok(interval_cross_op!(
+ array_lhs,
+ array_rhs,
+ as_interval_ym_array,
+ as_interval_mdn_array,
+ op_ym_mdn,
+ sign,
+ false,
+ IntervalYearMonthType,
+ IntervalMonthDayNanoType
+ )),
+ (Interval(DayTime), Interval(YearMonth)) => Ok(interval_cross_op!(
+ array_rhs,
+ array_lhs,
+ as_interval_ym_array,
+ as_interval_dt_array,
+ op_ym_dt,
+ sign,
+ true,
+ IntervalYearMonthType,
+ IntervalDayTimeType
+ )),
+ (Interval(DayTime), Interval(DayTime)) => Ok(interval_op!(
+ array_lhs,
+ array_rhs,
+ as_interval_dt_array,
+ op_dt,
+ sign,
+ IntervalDayTimeType
+ )),
+ (Interval(DayTime), Interval(MonthDayNano)) => Ok(interval_cross_op!(
+ array_lhs,
+ array_rhs,
+ as_interval_dt_array,
+ as_interval_mdn_array,
+ op_dt_mdn,
+ sign,
+ false,
+ IntervalDayTimeType,
+ IntervalMonthDayNanoType
+ )),
+ (Interval(MonthDayNano), Interval(YearMonth)) => Ok(interval_cross_op!(
+ array_rhs,
+ array_lhs,
+ as_interval_ym_array,
+ as_interval_mdn_array,
+ op_ym_mdn,
+ sign,
+ true,
+ IntervalYearMonthType,
+ IntervalMonthDayNanoType
+ )),
+ (Interval(MonthDayNano), Interval(DayTime)) => Ok(interval_cross_op!(
+ array_rhs,
+ array_lhs,
+ as_interval_dt_array,
+ as_interval_mdn_array,
+ op_dt_mdn,
+ sign,
+ true,
+ IntervalDayTimeType,
+ IntervalMonthDayNanoType
+ )),
+ (Interval(MonthDayNano), Interval(MonthDayNano)) => Ok(interval_op!(
+ array_lhs,
+ array_rhs,
+ as_interval_mdn_array,
+ op_mdn,
+ sign,
+ IntervalMonthDayNanoType
+ )),
+ (_, _) => Err(DataFusionError::Execution(format!(
+ "Invalid array types for Interval operation: {} {} {}",
+ array_lhs.data_type(),
+ sign,
+ array_rhs.data_type()
+ ))),
+ }
+}
+
+/// Performs a timestamp/interval operation on two arrays and returns the resulting array.
+/// The operation sign determines whether to perform addition or subtraction.
+/// The data type and unit of the two input arrays must match the supported combinations.
+pub fn ts_interval_array_op(
+ array_lhs: &ArrayRef,
+ sign: i32,
+ array_rhs: &ArrayRef,
+) -> Result<ArrayRef> {
+ use DataType::*;
+ use IntervalUnit::*;
+ use TimeUnit::*;
+ match (array_lhs.data_type(), array_rhs.data_type()) {
+ (Timestamp(Second, tz), Interval(YearMonth)) => Ok(ts_interval_op!(
+ array_lhs,
+ array_rhs,
+ tz,
+ as_timestamp_second_array,
+ as_interval_ym_array,
+ seconds_add_array::<YM_MODE>,
+ sign,
+ TimestampSecondType,
+ IntervalYearMonthType
+ )),
+ (Timestamp(Second, tz), Interval(DayTime)) => Ok(ts_interval_op!(
+ array_lhs,
+ array_rhs,
+ tz,
+ as_timestamp_second_array,
+ as_interval_dt_array,
+ seconds_add_array::<DT_MODE>,
+ sign,
+ TimestampSecondType,
+ IntervalDayTimeType
+ )),
+ (Timestamp(Second, tz), Interval(MonthDayNano)) => Ok(ts_interval_op!(
+ array_lhs,
+ array_rhs,
+ tz,
+ as_timestamp_second_array,
+ as_interval_mdn_array,
+ seconds_add_array::<MDN_MODE>,
+ sign,
+ TimestampSecondType,
+ IntervalMonthDayNanoType
+ )),
+ (Timestamp(Millisecond, tz), Interval(YearMonth)) => Ok(ts_interval_op!(
+ array_lhs,
+ array_rhs,
+ tz,
+ as_timestamp_millisecond_array,
+ as_interval_ym_array,
+ milliseconds_add_array::<YM_MODE>,
+ sign,
+ TimestampMillisecondType,
+ IntervalYearMonthType
+ )),
+ (Timestamp(Millisecond, tz), Interval(DayTime)) => Ok(ts_interval_op!(
+ array_lhs,
+ array_rhs,
+ tz,
+ as_timestamp_millisecond_array,
+ as_interval_dt_array,
+ milliseconds_add_array::<DT_MODE>,
+ sign,
+ TimestampMillisecondType,
+ IntervalDayTimeType
+ )),
+ (Timestamp(Millisecond, tz), Interval(MonthDayNano)) => Ok(ts_interval_op!(
+ array_lhs,
+ array_rhs,
+ tz,
+ as_timestamp_millisecond_array,
+ as_interval_mdn_array,
+ milliseconds_add_array::<MDN_MODE>,
+ sign,
+ TimestampMillisecondType,
+ IntervalMonthDayNanoType
+ )),
+ (Timestamp(Microsecond, tz), Interval(YearMonth)) => Ok(ts_interval_op!(
+ array_lhs,
+ array_rhs,
+ tz,
+ as_timestamp_microsecond_array,
+ as_interval_ym_array,
+ microseconds_add_array::<YM_MODE>,
+ sign,
+ TimestampMicrosecondType,
+ IntervalYearMonthType
+ )),
+ (Timestamp(Microsecond, tz), Interval(DayTime)) => Ok(ts_interval_op!(
+ array_lhs,
+ array_rhs,
+ tz,
+ as_timestamp_microsecond_array,
+ as_interval_dt_array,
+ microseconds_add_array::<DT_MODE>,
+ sign,
+ TimestampMicrosecondType,
+ IntervalDayTimeType
+ )),
+ (Timestamp(Microsecond, tz), Interval(MonthDayNano)) => Ok(ts_interval_op!(
+ array_lhs,
+ array_rhs,
+ tz,
+ as_timestamp_microsecond_array,
+ as_interval_mdn_array,
+ microseconds_add_array::<MDN_MODE>,
+ sign,
+ TimestampMicrosecondType,
+ IntervalMonthDayNanoType
+ )),
+ (Timestamp(Nanosecond, tz), Interval(YearMonth)) => Ok(ts_interval_op!(
+ array_lhs,
+ array_rhs,
+ tz,
+ as_timestamp_nanosecond_array,
+ as_interval_ym_array,
+ nanoseconds_add_array::<YM_MODE>,
+ sign,
+ TimestampNanosecondType,
+ IntervalYearMonthType
+ )),
+ (Timestamp(Nanosecond, tz), Interval(DayTime)) => Ok(ts_interval_op!(
+ array_lhs,
+ array_rhs,
+ tz,
+ as_timestamp_nanosecond_array,
+ as_interval_dt_array,
+ nanoseconds_add_array::<DT_MODE>,
+ sign,
+ TimestampNanosecondType,
+ IntervalDayTimeType
+ )),
+ (Timestamp(Nanosecond, tz), Interval(MonthDayNano)) => Ok(ts_interval_op!(
+ array_lhs,
+ array_rhs,
+ tz,
+ as_timestamp_nanosecond_array,
+ as_interval_mdn_array,
+ nanoseconds_add_array::<MDN_MODE>,
+ sign,
+ TimestampNanosecondType,
+ IntervalMonthDayNanoType
+ )),
+ (_, _) => Err(DataFusionError::Execution(format!(
+ "Invalid array types for Timestamp Interval operation: {} {} {}",
+ array_lhs.data_type(),
+ sign,
+ array_rhs.data_type()
+ ))),
+ }
+}
+
+#[inline]
+pub fn date32_interval_ym_op(
+ right: &Arc<dyn Array>,
+ epoch: &NaiveDate,
+ prior: &NaiveDate,
+ month_op: fn(NaiveDate, Months) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+ let right: &PrimitiveArray<IntervalYearMonthType> = right.as_primitive();
+ let ret = Arc::new(try_unary::<IntervalYearMonthType, _, Date32Type>(
+ right,
+ |ym| {
+ let months = Months::new(ym.try_into().map_err(|_| {
+ DataFusionError::Internal(
+ "Interval values cannot be casted as unsigned integers".to_string(),
+ )
+ })?);
+ let value = month_op(*prior, months).ok_or_else(|| {
+ DataFusionError::Internal("Resulting date is out of range".to_string())
+ })?;
+ Ok((value - *epoch).num_days() as i32)
+ },
+ )?) as _;
+ Ok(ret)
+}
+
+#[inline]
+pub fn date32_interval_dt_op(
+ right: &Arc<dyn Array>,
+ epoch: &NaiveDate,
+ prior: &NaiveDate,
+ day_op: fn(NaiveDate, Days) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+ let right: &PrimitiveArray<IntervalDayTimeType> = right.as_primitive();
+ let ret = Arc::new(try_unary::<IntervalDayTimeType, _, Date32Type>(
+ right,
+ |dt| {
+ let (days, millis) = IntervalDayTimeType::to_parts(dt);
+ let days = Days::new(days.try_into().map_err(|_| {
+ DataFusionError::Internal(
+ "Interval values cannot be casted as unsigned integers".to_string(),
+ )
+ })?);
+ let value = day_op(*prior, days).ok_or_else(|| {
+ DataFusionError::Internal("Resulting date is out of range".to_string())
+ })?;
+ let milli_days = millis as i64 / MILLISECONDS_IN_DAY;
+ Ok(((value - *epoch).num_days() - milli_days) as i32)
+ },
+ )?) as _;
+ Ok(ret)
+}
+
+#[inline]
+pub fn date32_interval_mdn_op(
+ right: &Arc<dyn Array>,
+ epoch: &NaiveDate,
+ prior: &NaiveDate,
+ day_op: fn(NaiveDate, Days) -> Option<NaiveDate>,
+ month_op: fn(NaiveDate, Months) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+ let cast_err = |_| {
+ DataFusionError::Internal(
+ "Interval values cannot be casted as unsigned integers".to_string(),
+ )
+ };
+ let out_of_range =
+ || DataFusionError::Internal("Resulting date is out of range".to_string());
+ let right: &PrimitiveArray<IntervalMonthDayNanoType> = right.as_primitive();
+ let ret = Arc::new(try_unary::<IntervalMonthDayNanoType, _, Date32Type>(
+ right,
+ |mdn| {
+ let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(mdn);
+ let months_obj = Months::new(months.try_into().map_err(cast_err)?);
+ let month_diff = month_op(*prior, months_obj).ok_or_else(out_of_range)?;
+ let days_obj = Days::new(days.try_into().map_err(cast_err)?);
+ let value = day_op(month_diff, days_obj).ok_or_else(out_of_range)?;
+ let nano_days = nanos / NANOSECONDS_IN_DAY;
+ Ok(((value - *epoch).num_days() - nano_days) as i32)
+ },
+ )?) as _;
+ Ok(ret)
+}
+
+#[inline]
+pub fn date64_interval_ym_op(
+ right: &Arc<dyn Array>,
+ epoch: &NaiveDate,
+ prior: &NaiveDate,
+ month_op: fn(NaiveDate, Months) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+ let right: &PrimitiveArray<IntervalYearMonthType> = right.as_primitive();
+ let ret = Arc::new(try_unary::<IntervalYearMonthType, _, Date64Type>(
+ right,
+ |ym| {
+ let months_obj = Months::new(ym.try_into().map_err(|_| {
+ DataFusionError::Internal(
+ "Interval values cannot be casted as unsigned integers".to_string(),
+ )
+ })?);
+ let date = month_op(*prior, months_obj).ok_or_else(|| {
+ DataFusionError::Internal("Resulting date is out of range".to_string())
+ })?;
+ Ok((date - *epoch).num_milliseconds())
+ },
+ )?) as _;
+ Ok(ret)
+}
+
+#[inline]
+pub fn date64_interval_dt_op(
+ right: &Arc<dyn Array>,
+ epoch: &NaiveDate,
+ prior: &NaiveDate,
+ day_op: fn(NaiveDate, Days) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+ let right: &PrimitiveArray<IntervalDayTimeType> = right.as_primitive();
+ let ret = Arc::new(try_unary::<IntervalDayTimeType, _, Date64Type>(
+ right,
+ |dt| {
+ let (days, millis) = IntervalDayTimeType::to_parts(dt);
+ let days_obj = Days::new(days.try_into().map_err(|_| {
+ DataFusionError::Internal(
+ "Interval values cannot be casted as unsigned integers".to_string(),
+ )
+ })?);
+ let date = day_op(*prior, days_obj).ok_or_else(|| {
+ DataFusionError::Internal("Resulting date is out of range".to_string())
+ })?;
+ Ok((date - *epoch).num_milliseconds() - millis as i64)
+ },
+ )?) as _;
+ Ok(ret)
+}
+
+#[inline]
+pub fn date64_interval_mdn_op(
+ right: &Arc<dyn Array>,
+ epoch: &NaiveDate,
+ prior: &NaiveDate,
+ day_op: fn(NaiveDate, Days) -> Option<NaiveDate>,
+ month_op: fn(NaiveDate, Months) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+ let cast_err = |_| {
+ DataFusionError::Internal(
+ "Interval values cannot be casted as unsigned integers".to_string(),
+ )
+ };
+ let out_of_range =
+ || DataFusionError::Internal("Resulting date is out of range".to_string());
+ let right: &PrimitiveArray<IntervalMonthDayNanoType> = right.as_primitive();
+ let ret = Arc::new(try_unary::<IntervalMonthDayNanoType, _, Date64Type>(
+ right,
+ |mdn| {
+ let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(mdn);
+ let months_obj = Months::new(months.try_into().map_err(cast_err)?);
+ let month_diff = month_op(*prior, months_obj).ok_or_else(out_of_range)?;
+ let days_obj = Days::new(days.try_into().map_err(cast_err)?);
+ let value = day_op(month_diff, days_obj).ok_or_else(out_of_range)?;
+ Ok((value - *epoch).num_milliseconds() - nanos / 1_000_000)
+ },
+ )?) as _;
+ Ok(ret)
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs
index 976ef845aa..f1933c1d18 100644
--- a/datafusion/physical-expr/src/expressions/datetime.rs
+++ b/datafusion/physical-expr/src/expressions/datetime.rs
@@ -117,16 +117,17 @@ impl PhysicalExpr for DateTimeIntervalExpr {
// This function evaluates temporal array vs scalar operations, such as timestamp - timestamp,
// interval + interval, timestamp + interval, and interval + timestamp. It takes one array and one scalar as input
// and an integer sign representing the operation (+1 for addition and -1 for subtraction).
- (ColumnarValue::Array(array_lhs), ColumnarValue::Scalar(array_rhs)) => {
- resolve_temporal_op_scalar(&array_lhs, sign, &array_rhs)
+ (ColumnarValue::Array(arr), ColumnarValue::Scalar(scalar)) => {
+ Ok(ColumnarValue::Array(resolve_temporal_op_scalar(
+ &arr, sign, &scalar, false,
+ )?))
}
// This function evaluates operations between a scalar value and an array of temporal
// values. One example is calculating the duration between a scalar timestamp and an
// array of timestamps (i.e. `now() - some_column`).
- (ColumnarValue::Scalar(scalar_lhs), ColumnarValue::Array(array_rhs)) => {
- let array_lhs = scalar_lhs.to_array_of_size(array_rhs.len());
- Ok(ColumnarValue::Array(resolve_temporal_op(
- &array_lhs, sign, &array_rhs,
+ (ColumnarValue::Scalar(scalar), ColumnarValue::Array(arr)) => {
+ Ok(ColumnarValue::Array(resolve_temporal_op_scalar(
+ &arr, sign, &scalar, true,
)?))
}
// This function evaluates temporal array operations, such as timestamp - timestamp, interval + interval,
@@ -596,59 +597,77 @@ mod tests {
let interval_array = interval_scalar.to_array();
// timestamp + interval
- if let ColumnarValue::Array(res1) =
- resolve_temporal_op_scalar(×tamp_array, 1, &interval_scalar)?
- {
- let res2 = timestamp_scalar.add(&interval_scalar)?.to_array();
- assert_eq!(
- &res1, &res2,
- "Timestamp Scalar={timestamp_scalar} + Interval Scalar={interval_scalar}"
- );
- }
+ let res1 =
+ resolve_temporal_op_scalar(×tamp_array, 1, &interval_scalar, false)?;
+ let res2 = timestamp_scalar.add(&interval_scalar)?.to_array();
+ assert_eq!(
+ &res1, &res2,
+ "Timestamp Scalar={timestamp_scalar} + Interval Scalar={interval_scalar}"
+ );
+ let res1 =
+ resolve_temporal_op_scalar(×tamp_array, 1, &interval_scalar, true)?;
+ let res2 = interval_scalar.add(×tamp_scalar)?.to_array();
+ assert_eq!(
+ &res1, &res2,
+ "Timestamp Scalar={timestamp_scalar} + Interval Scalar={interval_scalar}"
+ );
// timestamp - interval
- if let ColumnarValue::Array(res1) =
- resolve_temporal_op_scalar(×tamp_array, -1, &interval_scalar)?
- {
- let res2 = timestamp_scalar.sub(&interval_scalar)?.to_array();
- assert_eq!(
- &res1, &res2,
- "Timestamp Scalar={timestamp_scalar} - Interval Scalar={interval_scalar}"
- );
- }
+ let res1 =
+ resolve_temporal_op_scalar(×tamp_array, -1, &interval_scalar, false)?;
+ let res2 = timestamp_scalar.sub(&interval_scalar)?.to_array();
+ assert_eq!(
+ &res1, &res2,
+ "Timestamp Scalar={timestamp_scalar} - Interval Scalar={interval_scalar}"
+ );
// timestamp - timestamp
- if let ColumnarValue::Array(res1) =
- resolve_temporal_op_scalar(×tamp_array, -1, ×tamp_scalar)?
- {
- let res2 = timestamp_scalar.sub(×tamp_scalar)?.to_array();
- assert_eq!(
- &res1, &res2,
- "Timestamp Scalar={timestamp_scalar} - Timestamp Scalar={timestamp_scalar}"
- );
- }
+ let res1 =
+ resolve_temporal_op_scalar(×tamp_array, -1, ×tamp_scalar, false)?;
+ let res2 = timestamp_scalar.sub(×tamp_scalar)?.to_array();
+ assert_eq!(
+ &res1, &res2,
+ "Timestamp Scalar={timestamp_scalar} - Timestamp Scalar={timestamp_scalar}"
+ );
+ let res1 =
+ resolve_temporal_op_scalar(×tamp_array, -1, ×tamp_scalar, true)?;
+ let res2 = timestamp_scalar.sub(×tamp_scalar)?.to_array();
+ assert_eq!(
+ &res1, &res2,
+ "Timestamp Scalar={timestamp_scalar} - Timestamp Scalar={timestamp_scalar}"
+ );
// interval - interval
- if let ColumnarValue::Array(res1) =
- resolve_temporal_op_scalar(&interval_array, -1, &interval_scalar)?
- {
- let res2 = interval_scalar.sub(&interval_scalar)?.to_array();
- assert_eq!(
- &res1, &res2,
- "Interval Scalar={interval_scalar} - Interval Scalar={interval_scalar}"
- );
- }
+ let res1 =
+ resolve_temporal_op_scalar(&interval_array, -1, &interval_scalar, false)?;
+ let res2 = interval_scalar.sub(&interval_scalar)?.to_array();
+ assert_eq!(
+ &res1, &res2,
+ "Interval Scalar={interval_scalar} - Interval Scalar={interval_scalar}"
+ );
+ let res1 =
+ resolve_temporal_op_scalar(&interval_array, -1, &interval_scalar, true)?;
+ let res2 = interval_scalar.sub(&interval_scalar)?.to_array();
+ assert_eq!(
+ &res1, &res2,
+ "Interval Scalar={interval_scalar} - Interval Scalar={interval_scalar}"
+ );
// interval + interval
- if let ColumnarValue::Array(res1) =
- resolve_temporal_op_scalar(&interval_array, 1, &interval_scalar)?
- {
- let res2 = interval_scalar.add(&interval_scalar)?.to_array();
- assert_eq!(
- &res1, &res2,
- "Interval Scalar={interval_scalar} + Interval Scalar={interval_scalar}"
- );
- }
+ let res1 =
+ resolve_temporal_op_scalar(&interval_array, 1, &interval_scalar, false)?;
+ let res2 = interval_scalar.add(&interval_scalar)?.to_array();
+ assert_eq!(
+ &res1, &res2,
+ "Interval Scalar={interval_scalar} + Interval Scalar={interval_scalar}"
+ );
+ let res1 =
+ resolve_temporal_op_scalar(&interval_array, 1, &interval_scalar, true)?;
+ let res2 = interval_scalar.add(&interval_scalar)?.to_array();
+ assert_eq!(
+ &res1, &res2,
+ "Interval Scalar={interval_scalar} + Interval Scalar={interval_scalar}"
+ );
Ok(())
}
@@ -732,6 +751,175 @@ mod tests {
experiment(timestamp_scalar, interval_scalar)?;
+ // More test with all matchings of timestamps and intervals
+ let timestamp_scalar = ScalarValue::TimestampSecond(
+ Some(
+ NaiveDate::from_ymd_opt(2000, 12, 31)
+ .unwrap()
+ .and_hms_opt(23, 59, 59)
+ .unwrap()
+ .timestamp(),
+ ),
+ None,
+ );
+ let interval_scalar = ScalarValue::new_interval_ym(0, 1);
+
+ experiment(timestamp_scalar, interval_scalar)?;
+
+ let timestamp_scalar = ScalarValue::TimestampSecond(
+ Some(
+ NaiveDate::from_ymd_opt(2000, 12, 31)
+ .unwrap()
+ .and_hms_opt(23, 59, 59)
+ .unwrap()
+ .timestamp(),
+ ),
+ None,
+ );
+ let interval_scalar = ScalarValue::new_interval_dt(10, 100000);
+
+ experiment(timestamp_scalar, interval_scalar)?;
+
+ let timestamp_scalar = ScalarValue::TimestampSecond(
+ Some(
+ NaiveDate::from_ymd_opt(2000, 12, 31)
+ .unwrap()
+ .and_hms_opt(23, 59, 59)
+ .unwrap()
+ .timestamp(),
+ ),
+ None,
+ );
+ let interval_scalar = ScalarValue::new_interval_mdn(13, 32, 123456);
+
+ experiment(timestamp_scalar, interval_scalar)?;
+
+ let timestamp_scalar = ScalarValue::TimestampMillisecond(
+ Some(
+ NaiveDate::from_ymd_opt(2000, 12, 31)
+ .unwrap()
+ .and_hms_milli_opt(23, 59, 59, 909)
+ .unwrap()
+ .timestamp_millis(),
+ ),
+ None,
+ );
+ let interval_scalar = ScalarValue::new_interval_ym(0, 1);
+
+ experiment(timestamp_scalar, interval_scalar)?;
+
+ let timestamp_scalar = ScalarValue::TimestampMillisecond(
+ Some(
+ NaiveDate::from_ymd_opt(2000, 12, 31)
+ .unwrap()
+ .and_hms_milli_opt(23, 59, 59, 909)
+ .unwrap()
+ .timestamp_millis(),
+ ),
+ None,
+ );
+ let interval_scalar = ScalarValue::new_interval_dt(10, 100000);
+
+ experiment(timestamp_scalar, interval_scalar)?;
+
+ let timestamp_scalar = ScalarValue::TimestampMillisecond(
+ Some(
+ NaiveDate::from_ymd_opt(2000, 12, 31)
+ .unwrap()
+ .and_hms_milli_opt(23, 59, 59, 909)
+ .unwrap()
+ .timestamp_millis(),
+ ),
+ None,
+ );
+ let interval_scalar = ScalarValue::new_interval_mdn(13, 32, 123456);
+
+ experiment(timestamp_scalar, interval_scalar)?;
+
+ let timestamp_scalar = ScalarValue::TimestampMicrosecond(
+ Some(
+ NaiveDate::from_ymd_opt(2000, 12, 31)
+ .unwrap()
+ .and_hms_micro_opt(23, 59, 59, 987654)
+ .unwrap()
+ .timestamp_micros(),
+ ),
+ None,
+ );
+ let interval_scalar = ScalarValue::new_interval_ym(0, 1);
+
+ experiment(timestamp_scalar, interval_scalar)?;
+
+ let timestamp_scalar = ScalarValue::TimestampMicrosecond(
+ Some(
+ NaiveDate::from_ymd_opt(2000, 12, 31)
+ .unwrap()
+ .and_hms_micro_opt(23, 59, 59, 987654)
+ .unwrap()
+ .timestamp_micros(),
+ ),
+ None,
+ );
+ let interval_scalar = ScalarValue::new_interval_dt(10, 100000);
+
+ experiment(timestamp_scalar, interval_scalar)?;
+
+ let timestamp_scalar = ScalarValue::TimestampMicrosecond(
+ Some(
+ NaiveDate::from_ymd_opt(2000, 12, 31)
+ .unwrap()
+ .and_hms_micro_opt(23, 59, 59, 987654)
+ .unwrap()
+ .timestamp_micros(),
+ ),
+ None,
+ );
+ let interval_scalar = ScalarValue::new_interval_mdn(13, 32, 123456);
+
+ experiment(timestamp_scalar, interval_scalar)?;
+
+ let timestamp_scalar = ScalarValue::TimestampNanosecond(
+ Some(
+ NaiveDate::from_ymd_opt(2000, 12, 31)
+ .unwrap()
+ .and_hms_nano_opt(23, 59, 59, 999999999)
+ .unwrap()
+ .timestamp_nanos(),
+ ),
+ None,
+ );
+ let interval_scalar = ScalarValue::new_interval_ym(0, 1);
+
+ experiment(timestamp_scalar, interval_scalar)?;
+
+ let timestamp_scalar = ScalarValue::TimestampNanosecond(
+ Some(
+ NaiveDate::from_ymd_opt(2000, 12, 31)
+ .unwrap()
+ .and_hms_nano_opt(23, 59, 59, 999999999)
+ .unwrap()
+ .timestamp_nanos(),
+ ),
+ None,
+ );
+ let interval_scalar = ScalarValue::new_interval_dt(10, 100000);
+
+ experiment(timestamp_scalar, interval_scalar)?;
+
+ let timestamp_scalar = ScalarValue::TimestampNanosecond(
+ Some(
+ NaiveDate::from_ymd_opt(2000, 12, 31)
+ .unwrap()
+ .and_hms_nano_opt(23, 59, 59, 999999999)
+ .unwrap()
+ .timestamp_nanos(),
+ ),
+ None,
+ );
+ let interval_scalar = ScalarValue::new_interval_mdn(13, 32, 123456);
+
+ experiment(timestamp_scalar, interval_scalar)?;
+
Ok(())
}
}