You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/05/30 14:26:19 UTC

[arrow-datafusion] branch main updated: Refactor temporal arithmetic (#6433)

This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0abd4a272f Refactor temporal arithmetic  (#6433)
0abd4a272f is described below

commit 0abd4a272f471c0d9cedf1aa65fad677f4a7717e
Author: Berkay Şahin <12...@users.noreply.github.com>
AuthorDate: Tue May 30 17:26:12 2023 +0300

    Refactor temporal arithmetic  (#6433)
    
    * subtraction is ok, addition is on work
    
    * Addition is also ok for scalar vs array temporals
    
    * simplifications
    
    * unit tests are extended
    
    * Code style improvements
    
    * Code simplification
    
    * Macros written for duplications, unit tests added, operations moved to kernel_arrow
    
    ---------
    
    Co-authored-by: Mustafa Akur <mu...@synnada.ai>
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 datafusion/common/src/delta.rs                     |  103 +-
 datafusion/common/src/scalar.rs                    |   18 +-
 datafusion/optimizer/src/analyzer/type_coercion.rs |    2 +-
 datafusion/physical-expr/src/expressions/binary.rs | 1012 ++-----------
 .../src/expressions/binary/kernels_arrow.rs        | 1530 +++++++++++++++++++-
 .../physical-expr/src/expressions/datetime.rs      |  290 +++-
 6 files changed, 1892 insertions(+), 1063 deletions(-)

diff --git a/datafusion/common/src/delta.rs b/datafusion/common/src/delta.rs
index cc723a9943..bb71e3eb93 100644
--- a/datafusion/common/src/delta.rs
+++ b/datafusion/common/src/delta.rs
@@ -49,7 +49,8 @@ fn normalise_day(year: i32, month: u32, day: u32) -> u32 {
 
 /// Shift a date by the given number of months.
 /// Ambiguous month-ends are shifted backwards as necessary.
-pub fn shift_months<D: Datelike>(date: D, months: i32) -> D {
+pub fn shift_months<D: Datelike>(date: D, months: i32, sign: i32) -> D {
+    let months = months * sign;
     let mut year = date.year() + (date.month() as i32 + months) / 12;
     let mut month = (date.month() as i32 + months) % 12;
     let mut day = date.day();
@@ -108,117 +109,117 @@ mod tests {
         let base = NaiveDate::from_ymd_opt(2020, 1, 31).unwrap();
 
         assert_eq!(
-            shift_months(base, 0),
+            shift_months(base, 0, 1),
             NaiveDate::from_ymd_opt(2020, 1, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, 1),
+            shift_months(base, 1, 1),
             NaiveDate::from_ymd_opt(2020, 2, 29).unwrap()
         );
         assert_eq!(
-            shift_months(base, 2),
+            shift_months(base, 2, 1),
             NaiveDate::from_ymd_opt(2020, 3, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, 3),
+            shift_months(base, 3, 1),
             NaiveDate::from_ymd_opt(2020, 4, 30).unwrap()
         );
         assert_eq!(
-            shift_months(base, 4),
+            shift_months(base, 4, 1),
             NaiveDate::from_ymd_opt(2020, 5, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, 5),
+            shift_months(base, 5, 1),
             NaiveDate::from_ymd_opt(2020, 6, 30).unwrap()
         );
         assert_eq!(
-            shift_months(base, 6),
+            shift_months(base, 6, 1),
             NaiveDate::from_ymd_opt(2020, 7, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, 7),
+            shift_months(base, 7, 1),
             NaiveDate::from_ymd_opt(2020, 8, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, 8),
+            shift_months(base, 8, 1),
             NaiveDate::from_ymd_opt(2020, 9, 30).unwrap()
         );
         assert_eq!(
-            shift_months(base, 9),
+            shift_months(base, 9, 1),
             NaiveDate::from_ymd_opt(2020, 10, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, 10),
+            shift_months(base, 10, 1),
             NaiveDate::from_ymd_opt(2020, 11, 30).unwrap()
         );
         assert_eq!(
-            shift_months(base, 11),
+            shift_months(base, 11, 1),
             NaiveDate::from_ymd_opt(2020, 12, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, 12),
+            shift_months(base, 12, 1),
             NaiveDate::from_ymd_opt(2021, 1, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, 13),
+            shift_months(base, 13, 1),
             NaiveDate::from_ymd_opt(2021, 2, 28).unwrap()
         );
 
         assert_eq!(
-            shift_months(base, -1),
+            shift_months(base, 1, -1),
             NaiveDate::from_ymd_opt(2019, 12, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, -2),
+            shift_months(base, 2, -1),
             NaiveDate::from_ymd_opt(2019, 11, 30).unwrap()
         );
         assert_eq!(
-            shift_months(base, -3),
+            shift_months(base, 3, -1),
             NaiveDate::from_ymd_opt(2019, 10, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, -4),
+            shift_months(base, 4, -1),
             NaiveDate::from_ymd_opt(2019, 9, 30).unwrap()
         );
         assert_eq!(
-            shift_months(base, -5),
+            shift_months(base, 5, -1),
             NaiveDate::from_ymd_opt(2019, 8, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, -6),
+            shift_months(base, 6, -1),
             NaiveDate::from_ymd_opt(2019, 7, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, -7),
+            shift_months(base, 7, -1),
             NaiveDate::from_ymd_opt(2019, 6, 30).unwrap()
         );
         assert_eq!(
-            shift_months(base, -8),
+            shift_months(base, 8, -1),
             NaiveDate::from_ymd_opt(2019, 5, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, -9),
+            shift_months(base, 9, -1),
             NaiveDate::from_ymd_opt(2019, 4, 30).unwrap()
         );
         assert_eq!(
-            shift_months(base, -10),
+            shift_months(base, 10, -1),
             NaiveDate::from_ymd_opt(2019, 3, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, -11),
+            shift_months(base, 11, -1),
             NaiveDate::from_ymd_opt(2019, 2, 28).unwrap()
         );
         assert_eq!(
-            shift_months(base, -12),
+            shift_months(base, 12, -1),
             NaiveDate::from_ymd_opt(2019, 1, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, -13),
+            shift_months(base, 13, -1),
             NaiveDate::from_ymd_opt(2018, 12, 31).unwrap()
         );
 
         assert_eq!(
-            shift_months(base, 1265),
+            shift_months(base, 1265, 1),
             NaiveDate::from_ymd_opt(2125, 6, 30).unwrap()
         );
     }
@@ -227,42 +228,42 @@ mod tests {
     fn test_shift_months_with_overflow() {
         let base = NaiveDate::from_ymd_opt(2020, 12, 31).unwrap();
 
-        assert_eq!(shift_months(base, 0), base);
+        assert_eq!(shift_months(base, 0, 1), base);
         assert_eq!(
-            shift_months(base, 1),
+            shift_months(base, 1, 1),
             NaiveDate::from_ymd_opt(2021, 1, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, 2),
+            shift_months(base, 2, 1),
             NaiveDate::from_ymd_opt(2021, 2, 28).unwrap()
         );
         assert_eq!(
-            shift_months(base, 12),
+            shift_months(base, 12, 1),
             NaiveDate::from_ymd_opt(2021, 12, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, 18),
+            shift_months(base, 18, 1),
             NaiveDate::from_ymd_opt(2022, 6, 30).unwrap()
         );
 
         assert_eq!(
-            shift_months(base, -1),
+            shift_months(base, 1, -1),
             NaiveDate::from_ymd_opt(2020, 11, 30).unwrap()
         );
         assert_eq!(
-            shift_months(base, -2),
+            shift_months(base, 2, -1),
             NaiveDate::from_ymd_opt(2020, 10, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, -10),
+            shift_months(base, 10, -1),
             NaiveDate::from_ymd_opt(2020, 2, 29).unwrap()
         );
         assert_eq!(
-            shift_months(base, -12),
+            shift_months(base, 12, -1),
             NaiveDate::from_ymd_opt(2019, 12, 31).unwrap()
         );
         assert_eq!(
-            shift_months(base, -18),
+            shift_months(base, 18, -1),
             NaiveDate::from_ymd_opt(2019, 6, 30).unwrap()
         );
     }
@@ -275,61 +276,61 @@ mod tests {
         let base = NaiveDateTime::new(date, o_clock);
 
         assert_eq!(
-            shift_months(base, 0).date(),
+            shift_months(base, 0, 1).date(),
             NaiveDate::from_ymd_opt(2020, 1, 31).unwrap(),
         );
         assert_eq!(
-            shift_months(base, 1).date(),
+            shift_months(base, 1, 1).date(),
             NaiveDate::from_ymd_opt(2020, 2, 29).unwrap(),
         );
         assert_eq!(
-            shift_months(base, 2).date(),
+            shift_months(base, 2, 1).date(),
             NaiveDate::from_ymd_opt(2020, 3, 31).unwrap(),
         );
-        assert_eq!(shift_months(base, 0).time(), o_clock);
-        assert_eq!(shift_months(base, 1).time(), o_clock);
-        assert_eq!(shift_months(base, 2).time(), o_clock);
+        assert_eq!(shift_months(base, 0, 1).time(), o_clock);
+        assert_eq!(shift_months(base, 1, 1).time(), o_clock);
+        assert_eq!(shift_months(base, 2, 1).time(), o_clock);
     }
 
     #[test]
     fn add_11_months() {
         let prior = NaiveDate::from_ymd_opt(2000, 1, 1).unwrap();
-        let actual = shift_months(prior, 11);
+        let actual = shift_months(prior, 11, 1);
         assert_eq!(format!("{actual:?}").as_str(), "2000-12-01");
     }
 
     #[test]
     fn add_12_months() {
         let prior = NaiveDate::from_ymd_opt(2000, 1, 1).unwrap();
-        let actual = shift_months(prior, 12);
+        let actual = shift_months(prior, 12, 1);
         assert_eq!(format!("{actual:?}").as_str(), "2001-01-01");
     }
 
     #[test]
     fn add_13_months() {
         let prior = NaiveDate::from_ymd_opt(2000, 1, 1).unwrap();
-        let actual = shift_months(prior, 13);
+        let actual = shift_months(prior, 13, 1);
         assert_eq!(format!("{actual:?}").as_str(), "2001-02-01");
     }
 
     #[test]
     fn sub_11_months() {
         let prior = NaiveDate::from_ymd_opt(2000, 1, 1).unwrap();
-        let actual = shift_months(prior, -11);
+        let actual = shift_months(prior, 11, -1);
         assert_eq!(format!("{actual:?}").as_str(), "1999-02-01");
     }
 
     #[test]
     fn sub_12_months() {
         let prior = NaiveDate::from_ymd_opt(2000, 1, 1).unwrap();
-        let actual = shift_months(prior, -12);
+        let actual = shift_months(prior, 12, -1);
         assert_eq!(format!("{actual:?}").as_str(), "1999-01-01");
     }
 
     #[test]
     fn sub_13_months() {
         let prior = NaiveDate::from_ymd_opt(2000, 1, 1).unwrap();
-        let actual = shift_months(prior, -13);
+        let actual = shift_months(prior, 13, -1);
         assert_eq!(format!("{actual:?}").as_str(), "1998-12-01");
     }
 }
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 13f9d9f900..724b21787f 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -907,11 +907,11 @@ macro_rules! impl_op_arithmetic {
             )))),
             // Binary operations on arguments with different types:
             (ScalarValue::Date32(Some(days)), _) => {
-                let value = date32_add(*days, $RHS, get_sign!($OPERATION))?;
+                let value = date32_op(*days, $RHS, get_sign!($OPERATION))?;
                 Ok(ScalarValue::Date32(Some(value)))
             }
             (ScalarValue::Date64(Some(ms)), _) => {
-                let value = date64_add(*ms, $RHS, get_sign!($OPERATION))?;
+                let value = date64_op(*ms, $RHS, get_sign!($OPERATION))?;
                 Ok(ScalarValue::Date64(Some(value)))
             }
             (ScalarValue::TimestampSecond(Some(ts_s), zone), _) => {
@@ -1247,14 +1247,14 @@ pub fn calculate_naives<const TIME_MODE: bool>(
 }
 
 #[inline]
-pub fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+pub fn date32_op(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
     let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
     let prior = epoch.add(Duration::days(days as i64));
     do_date_math(prior, scalar, sign).map(|d| d.sub(epoch).num_days() as i32)
 }
 
 #[inline]
-pub fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+pub fn date64_op(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
     let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
     let prior = epoch.add(Duration::milliseconds(ms));
     do_date_math(prior, scalar, sign).map(|d| d.sub(epoch).num_milliseconds())
@@ -1398,7 +1398,7 @@ where
 {
     Ok(match scalar {
         ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
-        ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign),
+        ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i, sign),
         ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
         other => Err(DataFusionError::Execution(format!(
             "DateIntervalExpr does not support non-interval type {other:?}"
@@ -1415,7 +1415,7 @@ where
     D: Datelike + Add<Duration, Output = D>,
 {
     Ok(match INTERVAL_MODE {
-        YM_MODE => shift_months(prior, interval as i32 * sign),
+        YM_MODE => shift_months(prior, interval as i32, sign),
         DT_MODE => add_day_time(prior, interval as i64, sign),
         MDN_MODE => add_m_d_nano(prior, interval, sign),
         _ => {
@@ -1427,7 +1427,7 @@ where
 }
 
 // Can remove once chrono:0.4.23 is released
-fn add_m_d_nano<D>(prior: D, interval: i128, sign: i32) -> D
+pub fn add_m_d_nano<D>(prior: D, interval: i128, sign: i32) -> D
 where
     D: Datelike + Add<Duration, Output = D>,
 {
@@ -1435,13 +1435,13 @@ where
     let months = months * sign;
     let days = days * sign;
     let nanos = nanos * sign as i64;
-    let a = shift_months(prior, months);
+    let a = shift_months(prior, months, 1);
     let b = a.add(Duration::days(days as i64));
     b.add(Duration::nanoseconds(nanos))
 }
 
 // Can remove once chrono:0.4.23 is released
-fn add_day_time<D>(prior: D, interval: i64, sign: i32) -> D
+pub fn add_day_time<D>(prior: D, interval: i64, sign: i32) -> D
 where
     D: Datelike + Add<Duration, Output = D>,
 {
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 1761d237c3..da033a54ae 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -986,7 +986,7 @@ mod test {
     }
 
     #[test]
-    fn binary_op_date32_add_interval() -> Result<()> {
+    fn binary_op_date32_op_interval() -> Result<()> {
         //CAST(Utf8("1998-03-18") AS Date32) + IntervalDayTime("386547056640")
         let expr = cast(lit("1998-03-18"), DataType::Date32)
             + lit(ScalarValue::IntervalDayTime(Some(386547056640)));
diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs
index 295d5fe655..e5b66d4a39 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -21,8 +21,6 @@ mod kernels_arrow;
 
 use std::{any::Any, sync::Arc};
 
-use chrono::NaiveDateTime;
-
 use arrow::array::*;
 use arrow::compute::kernels::arithmetic::{
     add_dyn, add_scalar_dyn as add_dyn_scalar, divide_dyn_opt,
@@ -50,17 +48,12 @@ use arrow::compute::kernels::comparison::{
     eq_dyn_utf8_scalar, gt_dyn_utf8_scalar, gt_eq_dyn_utf8_scalar, lt_dyn_utf8_scalar,
     lt_eq_dyn_utf8_scalar, neq_dyn_utf8_scalar,
 };
-use arrow::compute::{cast, try_unary, unary, CastOptions};
+use arrow::compute::{cast, CastOptions};
 use arrow::datatypes::*;
 
 use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
 use arrow::compute::kernels::concat_elements::concat_elements_utf8;
-use datafusion_common::scalar::{
-    calculate_naives, microseconds_add, microseconds_sub, milliseconds_add,
-    milliseconds_sub, nanoseconds_add, nanoseconds_sub, op_dt, op_dt_mdn, op_mdn, op_ym,
-    op_ym_dt, op_ym_mdn, parse_timezones, seconds_add, seconds_sub, MILLISECOND_MODE,
-    NANOSECOND_MODE,
-};
+
 use datafusion_expr::type_coercion::{is_decimal, is_timestamp, is_utf8_or_large_utf8};
 use kernels::{
     bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar,
@@ -68,21 +61,25 @@ use kernels::{
     bitwise_shift_right_dyn_scalar, bitwise_xor_dyn, bitwise_xor_dyn_scalar,
 };
 use kernels_arrow::{
-    add_decimal_dyn_scalar, add_dyn_decimal, add_dyn_temporal, add_dyn_temporal_scalar,
-    divide_decimal_dyn_scalar, divide_dyn_opt_decimal, is_distinct_from,
-    is_distinct_from_binary, is_distinct_from_bool, is_distinct_from_decimal,
-    is_distinct_from_f32, is_distinct_from_f64, is_distinct_from_null,
-    is_distinct_from_utf8, is_not_distinct_from, is_not_distinct_from_binary,
-    is_not_distinct_from_bool, is_not_distinct_from_decimal, is_not_distinct_from_f32,
-    is_not_distinct_from_f64, is_not_distinct_from_null, is_not_distinct_from_utf8,
-    modulus_decimal_dyn_scalar, modulus_dyn_decimal, multiply_decimal_dyn_scalar,
-    multiply_dyn_decimal, subtract_decimal_dyn_scalar, subtract_dyn_decimal,
-    subtract_dyn_temporal, subtract_dyn_temporal_scalar,
+    add_decimal_dyn_scalar, add_dyn_decimal, add_dyn_temporal, divide_decimal_dyn_scalar,
+    divide_dyn_opt_decimal, is_distinct_from, is_distinct_from_binary,
+    is_distinct_from_bool, is_distinct_from_decimal, is_distinct_from_f32,
+    is_distinct_from_f64, is_distinct_from_null, is_distinct_from_utf8,
+    is_not_distinct_from, is_not_distinct_from_binary, is_not_distinct_from_bool,
+    is_not_distinct_from_decimal, is_not_distinct_from_f32, is_not_distinct_from_f64,
+    is_not_distinct_from_null, is_not_distinct_from_utf8, modulus_decimal_dyn_scalar,
+    modulus_dyn_decimal, multiply_decimal_dyn_scalar, multiply_dyn_decimal,
+    subtract_decimal_dyn_scalar, subtract_dyn_decimal, subtract_dyn_temporal,
 };
 
 use arrow::datatypes::{DataType, Schema, TimeUnit};
 use arrow::record_batch::RecordBatch;
 
+use self::kernels_arrow::{
+    add_dyn_temporal_left_scalar, add_dyn_temporal_right_scalar,
+    subtract_dyn_temporal_left_scalar, subtract_dyn_temporal_right_scalar,
+};
+
 use super::column::Column;
 use crate::expressions::cast_column;
 use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
@@ -90,12 +87,7 @@ use crate::intervals::{apply_operator, Interval};
 use crate::physical_expr::down_cast_any_ref;
 use crate::{analysis_expect, AnalysisContext, ExprBoundaries, PhysicalExpr};
 use datafusion_common::cast::as_boolean_array;
-use datafusion_common::cast::{
-    as_interval_dt_array, as_interval_mdn_array, as_interval_ym_array,
-    as_timestamp_microsecond_array, as_timestamp_millisecond_array,
-    as_timestamp_nanosecond_array, as_timestamp_second_array,
-};
-use datafusion_common::scalar::*;
+
 use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::type_coercion::binary::{
@@ -1089,84 +1081,69 @@ impl BinaryExpr {
         scalar: ScalarValue,
         result_type: &DataType,
     ) -> Result<Option<Result<ArrayRef>>> {
+        use Operator::*;
         let bool_type = &DataType::Boolean;
         let scalar_result = match &self.op {
-            Operator::Lt => {
-                binary_array_op_dyn_scalar!(array, scalar, lt, bool_type)
-            }
-            Operator::LtEq => {
-                binary_array_op_dyn_scalar!(array, scalar, lt_eq, bool_type)
-            }
-            Operator::Gt => {
-                binary_array_op_dyn_scalar!(array, scalar, gt, bool_type)
-            }
-            Operator::GtEq => {
-                binary_array_op_dyn_scalar!(array, scalar, gt_eq, bool_type)
-            }
-            Operator::Eq => {
-                binary_array_op_dyn_scalar!(array, scalar, eq, bool_type)
-            }
-            Operator::NotEq => {
-                binary_array_op_dyn_scalar!(array, scalar, neq, bool_type)
-            }
-            Operator::Plus => {
+            Lt => binary_array_op_dyn_scalar!(array, scalar, lt, bool_type),
+            LtEq => binary_array_op_dyn_scalar!(array, scalar, lt_eq, bool_type),
+            Gt => binary_array_op_dyn_scalar!(array, scalar, gt, bool_type),
+            GtEq => binary_array_op_dyn_scalar!(array, scalar, gt_eq, bool_type),
+            Eq => binary_array_op_dyn_scalar!(array, scalar, eq, bool_type),
+            NotEq => binary_array_op_dyn_scalar!(array, scalar, neq, bool_type),
+            Plus => {
                 binary_primitive_array_op_dyn_scalar!(array, scalar, add, result_type)
             }
-            Operator::Minus => {
-                binary_primitive_array_op_dyn_scalar!(
-                    array,
-                    scalar,
-                    subtract,
-                    result_type
-                )
-            }
-            Operator::Multiply => {
-                binary_primitive_array_op_dyn_scalar!(
-                    array,
-                    scalar,
-                    multiply,
-                    result_type
-                )
-            }
-            Operator::Divide => {
+            Minus => binary_primitive_array_op_dyn_scalar!(
+                array,
+                scalar,
+                subtract,
+                result_type
+            ),
+            Multiply => binary_primitive_array_op_dyn_scalar!(
+                array,
+                scalar,
+                multiply,
+                result_type
+            ),
+            Divide => {
                 binary_primitive_array_op_dyn_scalar!(array, scalar, divide, result_type)
             }
-            Operator::Modulo => {
+            Modulo => {
                 binary_primitive_array_op_dyn_scalar!(array, scalar, modulus, result_type)
             }
-            Operator::RegexMatch => binary_string_array_flag_op_scalar!(
+            RegexMatch => binary_string_array_flag_op_scalar!(
                 array,
                 scalar,
                 regexp_is_match,
                 false,
                 false
             ),
-            Operator::RegexIMatch => binary_string_array_flag_op_scalar!(
+            RegexIMatch => binary_string_array_flag_op_scalar!(
                 array,
                 scalar,
                 regexp_is_match,
                 false,
                 true
             ),
-            Operator::RegexNotMatch => binary_string_array_flag_op_scalar!(
+            RegexNotMatch => binary_string_array_flag_op_scalar!(
                 array,
                 scalar,
                 regexp_is_match,
                 true,
                 false
             ),
-            Operator::RegexNotIMatch => binary_string_array_flag_op_scalar!(
+            RegexNotIMatch => binary_string_array_flag_op_scalar!(
                 array,
                 scalar,
                 regexp_is_match,
                 true,
                 true
             ),
-            Operator::BitwiseAnd => bitwise_and_dyn_scalar(array, scalar),
-            Operator::BitwiseOr => bitwise_or_dyn_scalar(array, scalar),
-            Operator::BitwiseXor => bitwise_xor_dyn_scalar(array, scalar),
-            Operator::BitwiseShiftRight => bitwise_shift_right_dyn_scalar(array, scalar),
-            Operator::BitwiseShiftLeft => bitwise_shift_left_dyn_scalar(array, scalar),
+            BitwiseAnd => bitwise_and_dyn_scalar(array, scalar),
+            BitwiseOr => bitwise_or_dyn_scalar(array, scalar),
+            BitwiseXor => bitwise_xor_dyn_scalar(array, scalar),
+            BitwiseShiftRight => bitwise_shift_right_dyn_scalar(array, scalar),
+            BitwiseShiftLeft => bitwise_shift_left_dyn_scalar(array, scalar),
             // if scalar operation is not supported - fallback to array implementation
             _ => None,
         };
@@ -1181,26 +1158,15 @@ impl BinaryExpr {
         scalar: ScalarValue,
         array: &ArrayRef,
     ) -> Result<Option<Result<ArrayRef>>> {
+        use Operator::*;
         let bool_type = &DataType::Boolean;
         let scalar_result = match &self.op {
-            Operator::Lt => {
-                binary_array_op_dyn_scalar!(array, scalar, gt, bool_type)
-            }
-            Operator::LtEq => {
-                binary_array_op_dyn_scalar!(array, scalar, gt_eq, bool_type)
-            }
-            Operator::Gt => {
-                binary_array_op_dyn_scalar!(array, scalar, lt, bool_type)
-            }
-            Operator::GtEq => {
-                binary_array_op_dyn_scalar!(array, scalar, lt_eq, bool_type)
-            }
-            Operator::Eq => {
-                binary_array_op_dyn_scalar!(array, scalar, eq, bool_type)
-            }
-            Operator::NotEq => {
-                binary_array_op_dyn_scalar!(array, scalar, neq, bool_type)
-            }
+            Lt => binary_array_op_dyn_scalar!(array, scalar, gt, bool_type),
+            LtEq => binary_array_op_dyn_scalar!(array, scalar, gt_eq, bool_type),
+            Gt => binary_array_op_dyn_scalar!(array, scalar, lt, bool_type),
+            GtEq => binary_array_op_dyn_scalar!(array, scalar, lt_eq, bool_type),
+            Eq => binary_array_op_dyn_scalar!(array, scalar, eq, bool_type),
+            NotEq => binary_array_op_dyn_scalar!(array, scalar, neq, bool_type),
             // if scalar operation is not supported - fallback to array implementation
             _ => None,
         };
@@ -1215,14 +1181,15 @@ impl BinaryExpr {
         right_data_type: &DataType,
         result_type: &DataType,
     ) -> Result<ArrayRef> {
+        use Operator::*;
         match &self.op {
-            Operator::Lt => lt_dyn(&left, &right),
-            Operator::LtEq => lt_eq_dyn(&left, &right),
-            Operator::Gt => gt_dyn(&left, &right),
-            Operator::GtEq => gt_eq_dyn(&left, &right),
-            Operator::Eq => eq_dyn(&left, &right),
-            Operator::NotEq => neq_dyn(&left, &right),
-            Operator::IsDistinctFrom => {
+            Lt => lt_dyn(&left, &right),
+            LtEq => lt_eq_dyn(&left, &right),
+            Gt => gt_dyn(&left, &right),
+            GtEq => gt_eq_dyn(&left, &right),
+            Eq => eq_dyn(&left, &right),
+            NotEq => neq_dyn(&left, &right),
+            IsDistinctFrom => {
                 match (left_data_type, right_data_type) {
                     // exchange lhs and rhs when lhs is Null, since `binary_array_op` is
                     // always try to down cast array according to $LEFT expression.
@@ -1232,25 +1199,21 @@ impl BinaryExpr {
                     _ => binary_array_op!(left, right, is_distinct_from),
                 }
             }
-            Operator::IsNotDistinctFrom => {
-                binary_array_op!(left, right, is_not_distinct_from)
-            }
-            Operator::Plus => {
-                binary_primitive_array_op_dyn!(left, right, add_dyn, result_type)
-            }
-            Operator::Minus => {
+            IsNotDistinctFrom => binary_array_op!(left, right, is_not_distinct_from),
+            Plus => binary_primitive_array_op_dyn!(left, right, add_dyn, result_type),
+            Minus => {
                 binary_primitive_array_op_dyn!(left, right, subtract_dyn, result_type)
             }
-            Operator::Multiply => {
+            Multiply => {
                 binary_primitive_array_op_dyn!(left, right, multiply_dyn, result_type)
             }
-            Operator::Divide => {
+            Divide => {
                 binary_primitive_array_op_dyn!(left, right, divide_dyn_opt, result_type)
             }
-            Operator::Modulo => {
+            Modulo => {
                 binary_primitive_array_op_dyn!(left, right, modulus_dyn, result_type)
             }
-            Operator::And => {
+            And => {
                 if left_data_type == &DataType::Boolean {
                     boolean_op!(&left, &right, and_kleene)
                 } else {
@@ -1262,7 +1225,7 @@ impl BinaryExpr {
                     )))
                 }
             }
-            Operator::Or => {
+            Or => {
                 if left_data_type == &DataType::Boolean {
                     boolean_op!(&left, &right, or_kleene)
                 } else {
@@ -1272,24 +1235,24 @@ impl BinaryExpr {
                     )))
                 }
             }
-            Operator::RegexMatch => {
+            RegexMatch => {
                 binary_string_array_flag_op!(left, right, regexp_is_match, false, false)
             }
-            Operator::RegexIMatch => {
+            RegexIMatch => {
                 binary_string_array_flag_op!(left, right, regexp_is_match, false, true)
             }
-            Operator::RegexNotMatch => {
+            RegexNotMatch => {
                 binary_string_array_flag_op!(left, right, regexp_is_match, true, false)
             }
-            Operator::RegexNotIMatch => {
+            RegexNotIMatch => {
                 binary_string_array_flag_op!(left, right, regexp_is_match, true, true)
             }
-            Operator::BitwiseAnd => bitwise_and_dyn(left, right),
-            Operator::BitwiseOr => bitwise_or_dyn(left, right),
-            Operator::BitwiseXor => bitwise_xor_dyn(left, right),
-            Operator::BitwiseShiftRight => bitwise_shift_right_dyn(left, right),
-            Operator::BitwiseShiftLeft => bitwise_shift_left_dyn(left, right),
-            Operator::StringConcat => {
+            BitwiseAnd => bitwise_and_dyn(left, right),
+            BitwiseOr => bitwise_or_dyn(left, right),
+            BitwiseXor => bitwise_xor_dyn(left, right),
+            BitwiseShiftRight => bitwise_shift_right_dyn(left, right),
+            BitwiseShiftLeft => bitwise_shift_left_dyn(left, right),
+            StringConcat => {
                 binary_string_array_op!(left, right, concat_elements)
             }
         }
@@ -1322,25 +1285,6 @@ pub fn binary(
     Ok(Arc::new(BinaryExpr::new(lhs, op, rhs)))
 }
 
-macro_rules! sub_timestamp_macro {
-    ($array:expr, $rhs:expr, $caster:expr, $interval_type:ty, $opt_tz_lhs:expr, $multiplier:expr,
-        $opt_tz_rhs:expr, $unit_sub:expr, $naive_sub_fn:expr, $counter:expr) => {{
-        let prim_array = $caster(&$array)?;
-        let ret: PrimitiveArray<$interval_type> = try_unary(prim_array, |lhs| {
-            let (parsed_lhs_tz, parsed_rhs_tz) =
-                (parse_timezones($opt_tz_lhs)?, parse_timezones($opt_tz_rhs)?);
-            let (naive_lhs, naive_rhs) = calculate_naives::<$unit_sub>(
-                lhs.mul_wrapping($multiplier),
-                parsed_lhs_tz,
-                $rhs.mul_wrapping($multiplier),
-                parsed_rhs_tz,
-            )?;
-            Ok($naive_sub_fn($counter(&naive_lhs), $counter(&naive_rhs)))
-        })?;
-        Arc::new(ret) as ArrayRef
-    }};
-}
-
 pub fn resolve_temporal_op(
     lhs: &ArrayRef,
     sign: i32,
@@ -1356,795 +1300,19 @@ pub fn resolve_temporal_op(
 }
 
 pub fn resolve_temporal_op_scalar(
-    lhs: &ArrayRef,
-    sign: i32,
-    rhs: &ScalarValue,
-) -> Result<ColumnarValue> {
-    match sign {
-        1 => add_dyn_temporal_scalar(lhs, rhs),
-        -1 => subtract_dyn_temporal_scalar(lhs, rhs),
-        other => Err(DataFusionError::Internal(format!(
-            "Undefined operation for temporal types {other}"
-        ))),
-    }
-}
-
-/// This function handles the Timestamp - Timestamp operations,
-/// where the first one is an array, and the second one is a scalar,
-/// hence the result is also an array.
-pub fn ts_scalar_ts_op(array: &ArrayRef, scalar: &ScalarValue) -> Result<ColumnarValue> {
-    let ret = match (array.data_type(), scalar) {
-        (
-            DataType::Timestamp(TimeUnit::Second, opt_tz_lhs),
-            ScalarValue::TimestampSecond(Some(rhs), opt_tz_rhs),
-        ) => {
-            sub_timestamp_macro!(
-                array,
-                rhs,
-                as_timestamp_second_array,
-                IntervalDayTimeType,
-                opt_tz_lhs.as_deref(),
-                1000,
-                opt_tz_rhs.as_deref(),
-                MILLISECOND_MODE,
-                seconds_sub,
-                NaiveDateTime::timestamp
-            )
-        }
-        (
-            DataType::Timestamp(TimeUnit::Millisecond, opt_tz_lhs),
-            ScalarValue::TimestampMillisecond(Some(rhs), opt_tz_rhs),
-        ) => {
-            sub_timestamp_macro!(
-                array,
-                rhs,
-                as_timestamp_millisecond_array,
-                IntervalDayTimeType,
-                opt_tz_lhs.as_deref(),
-                1,
-                opt_tz_rhs.as_deref(),
-                MILLISECOND_MODE,
-                milliseconds_sub,
-                NaiveDateTime::timestamp_millis
-            )
-        }
-        (
-            DataType::Timestamp(TimeUnit::Microsecond, opt_tz_lhs),
-            ScalarValue::TimestampMicrosecond(Some(rhs), opt_tz_rhs),
-        ) => {
-            sub_timestamp_macro!(
-                array,
-                rhs,
-                as_timestamp_microsecond_array,
-                IntervalMonthDayNanoType,
-                opt_tz_lhs.as_deref(),
-                1000,
-                opt_tz_rhs.as_deref(),
-                NANOSECOND_MODE,
-                microseconds_sub,
-                NaiveDateTime::timestamp_micros
-            )
-        }
-        (
-            DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_lhs),
-            ScalarValue::TimestampNanosecond(Some(rhs), opt_tz_rhs),
-        ) => {
-            sub_timestamp_macro!(
-                array,
-                rhs,
-                as_timestamp_nanosecond_array,
-                IntervalMonthDayNanoType,
-                opt_tz_lhs.as_deref(),
-                1,
-                opt_tz_rhs.as_deref(),
-                NANOSECOND_MODE,
-                nanoseconds_sub,
-                NaiveDateTime::timestamp_nanos
-            )
-        }
-        (_, _) => {
-            return Err(DataFusionError::Internal(format!(
-                "Invalid array - scalar types for Timestamp subtraction: {:?} - {:?}",
-                array.data_type(),
-                scalar.get_datatype()
-            )));
-        }
-    };
-    Ok(ColumnarValue::Array(ret))
-}
-
-macro_rules! sub_timestamp_interval_macro {
-    ($array:expr, $as_timestamp:expr, $ts_type:ty, $fn_op:expr, $scalar:expr, $sign:expr, $tz:expr) => {{
-        let array = $as_timestamp(&$array)?;
-        let ret: PrimitiveArray<$ts_type> =
-            try_unary::<$ts_type, _, $ts_type>(array, |ts_s| {
-                Ok($fn_op(ts_s, $scalar, $sign)?)
-            })?;
-        Arc::new(ret.with_timezone_opt($tz.clone())) as ArrayRef
-    }};
-}
-/// This function handles the Timestamp - Interval operations,
-/// where the first one is an array, and the second one is a scalar,
-/// hence the result is also an array.
-pub fn ts_scalar_interval_op(
-    array: &ArrayRef,
-    sign: i32,
-    scalar: &ScalarValue,
-) -> Result<ColumnarValue> {
-    let ret = match array.data_type() {
-        DataType::Timestamp(TimeUnit::Second, tz) => {
-            sub_timestamp_interval_macro!(
-                array,
-                as_timestamp_second_array,
-                TimestampSecondType,
-                seconds_add,
-                scalar,
-                sign,
-                tz
-            )
-        }
-        DataType::Timestamp(TimeUnit::Millisecond, tz) => {
-            sub_timestamp_interval_macro!(
-                array,
-                as_timestamp_millisecond_array,
-                TimestampMillisecondType,
-                milliseconds_add,
-                scalar,
-                sign,
-                tz
-            )
-        }
-        DataType::Timestamp(TimeUnit::Microsecond, tz) => {
-            sub_timestamp_interval_macro!(
-                array,
-                as_timestamp_microsecond_array,
-                TimestampMicrosecondType,
-                microseconds_add,
-                scalar,
-                sign,
-                tz
-            )
-        }
-        DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
-            sub_timestamp_interval_macro!(
-                array,
-                as_timestamp_nanosecond_array,
-                TimestampNanosecondType,
-                nanoseconds_add,
-                scalar,
-                sign,
-                tz
-            )
-        }
-        _ => Err(DataFusionError::Internal(format!(
-            "Invalid lhs type for Timestamp vs Interval operations: {}",
-            array.data_type()
-        )))?,
-    };
-    Ok(ColumnarValue::Array(ret))
-}
-
-macro_rules! sub_interval_macro {
-    ($array:expr, $as_interval:expr, $interval_type:ty, $fn_op:expr, $scalar:expr, $sign:expr) => {{
-        let array = $as_interval(&$array)?;
-        let ret: PrimitiveArray<$interval_type> =
-            unary(array, |lhs| $fn_op(lhs, *$scalar, $sign));
-        Arc::new(ret) as ArrayRef
-    }};
-}
-macro_rules! sub_interval_cross_macro {
-    ($array:expr, $as_interval:expr, $commute:expr, $fn_op:expr, $scalar:expr, $sign:expr, $t1:ty, $t2:ty) => {{
-        let array = $as_interval(&$array)?;
-        let ret: PrimitiveArray<IntervalMonthDayNanoType> = if $commute {
-            unary(array, |lhs| {
-                $fn_op(*$scalar as $t1, lhs as $t2, $sign, $commute)
-            })
-        } else {
-            unary(array, |lhs| {
-                $fn_op(lhs as $t1, *$scalar as $t2, $sign, $commute)
-            })
-        };
-        Arc::new(ret) as ArrayRef
-    }};
-}
-/// This function handles the Interval - Interval operations,
-/// where the first one is an array, and the second one is a scalar,
-/// hence the result is also an interval array.
-pub fn interval_scalar_interval_op(
-    array: &ArrayRef,
+    arr: &ArrayRef,
     sign: i32,
     scalar: &ScalarValue,
-) -> Result<ColumnarValue> {
-    let ret = match (array.data_type(), scalar) {
-        (
-            DataType::Interval(IntervalUnit::YearMonth),
-            ScalarValue::IntervalYearMonth(Some(rhs)),
-        ) => {
-            sub_interval_macro!(
-                array,
-                as_interval_ym_array,
-                IntervalYearMonthType,
-                op_ym,
-                rhs,
-                sign
-            )
-        }
-        (
-            DataType::Interval(IntervalUnit::YearMonth),
-            ScalarValue::IntervalDayTime(Some(rhs)),
-        ) => {
-            sub_interval_cross_macro!(
-                array,
-                as_interval_ym_array,
-                false,
-                op_ym_dt,
-                rhs,
-                sign,
-                i32,
-                i64
-            )
-        }
-        (
-            DataType::Interval(IntervalUnit::YearMonth),
-            ScalarValue::IntervalMonthDayNano(Some(rhs)),
-        ) => {
-            sub_interval_cross_macro!(
-                array,
-                as_interval_ym_array,
-                false,
-                op_ym_mdn,
-                rhs,
-                sign,
-                i32,
-                i128
-            )
-        }
-        (
-            DataType::Interval(IntervalUnit::DayTime),
-            ScalarValue::IntervalYearMonth(Some(rhs)),
-        ) => {
-            sub_interval_cross_macro!(
-                array,
-                as_interval_dt_array,
-                true,
-                op_ym_dt,
-                rhs,
-                sign,
-                i32,
-                i64
-            )
-        }
-        (
-            DataType::Interval(IntervalUnit::DayTime),
-            ScalarValue::IntervalDayTime(Some(rhs)),
-        ) => {
-            sub_interval_macro!(
-                array,
-                as_interval_dt_array,
-                IntervalDayTimeType,
-                op_dt,
-                rhs,
-                sign
-            )
-        }
-        (
-            DataType::Interval(IntervalUnit::DayTime),
-            ScalarValue::IntervalMonthDayNano(Some(rhs)),
-        ) => {
-            sub_interval_cross_macro!(
-                array,
-                as_interval_dt_array,
-                false,
-                op_dt_mdn,
-                rhs,
-                sign,
-                i64,
-                i128
-            )
-        }
-        (
-            DataType::Interval(IntervalUnit::MonthDayNano),
-            ScalarValue::IntervalYearMonth(Some(rhs)),
-        ) => {
-            sub_interval_cross_macro!(
-                array,
-                as_interval_mdn_array,
-                true,
-                op_ym_mdn,
-                rhs,
-                sign,
-                i32,
-                i128
-            )
-        }
-        (
-            DataType::Interval(IntervalUnit::MonthDayNano),
-            ScalarValue::IntervalDayTime(Some(rhs)),
-        ) => {
-            sub_interval_cross_macro!(
-                array,
-                as_interval_mdn_array,
-                true,
-                op_dt_mdn,
-                rhs,
-                sign,
-                i64,
-                i128
-            )
-        }
-        (
-            DataType::Interval(IntervalUnit::MonthDayNano),
-            ScalarValue::IntervalMonthDayNano(Some(rhs)),
-        ) => {
-            sub_interval_macro!(
-                array,
-                as_interval_mdn_array,
-                IntervalMonthDayNanoType,
-                op_mdn,
-                rhs,
-                sign
-            )
-        }
-        _ => Err(DataFusionError::Internal(format!(
-            "Invalid operands for Interval vs Interval operations: {} - {}",
-            array.data_type(),
-            scalar.get_datatype(),
-        )))?,
-    };
-    Ok(ColumnarValue::Array(ret))
-}
-
-// Macros related with timestamp & interval operations
-macro_rules! ts_sub_op {
-    ($lhs:ident, $rhs:ident, $lhs_tz:ident, $rhs_tz:ident, $coef:expr, $caster:expr, $op:expr, $ts_unit:expr, $mode:expr, $type_out:ty) => {{
-        let prim_array_lhs = $caster(&$lhs)?;
-        let prim_array_rhs = $caster(&$rhs)?;
-        let ret: PrimitiveArray<$type_out> =
-            arrow::compute::try_binary(prim_array_lhs, prim_array_rhs, |ts1, ts2| {
-                let (parsed_lhs_tz, parsed_rhs_tz) = (
-                    parse_timezones($lhs_tz.as_deref())?,
-                    parse_timezones($rhs_tz.as_deref())?,
-                );
-                let (naive_lhs, naive_rhs) = calculate_naives::<$mode>(
-                    ts1.mul_wrapping($coef),
-                    parsed_lhs_tz,
-                    ts2.mul_wrapping($coef),
-                    parsed_rhs_tz,
-                )?;
-                Ok($op($ts_unit(&naive_lhs), $ts_unit(&naive_rhs)))
-            })?;
-        Arc::new(ret) as ArrayRef
-    }};
-}
-macro_rules! interval_op {
-    ($lhs:ident, $rhs:ident, $caster:expr, $op:expr, $sign:ident, $type_in:ty) => {{
-        let prim_array_lhs = $caster(&$lhs)?;
-        let prim_array_rhs = $caster(&$rhs)?;
-        let ret = Arc::new(arrow::compute::binary::<$type_in, $type_in, _, $type_in>(
-            prim_array_lhs,
-            prim_array_rhs,
-            |interval1, interval2| $op(interval1, interval2, $sign),
-        )?) as ArrayRef;
-        ret
-    }};
-}
-macro_rules! interval_cross_op {
-    ($lhs:ident, $rhs:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $commute:ident, $type_in1:ty, $type_in2:ty) => {{
-        let prim_array_lhs = $caster1(&$lhs)?;
-        let prim_array_rhs = $caster2(&$rhs)?;
-        let ret = Arc::new(arrow::compute::binary::<
-            $type_in1,
-            $type_in2,
-            _,
-            IntervalMonthDayNanoType,
-        >(
-            prim_array_lhs,
-            prim_array_rhs,
-            |interval1, interval2| $op(interval1, interval2, $sign, $commute),
-        )?) as ArrayRef;
-        ret
-    }};
-}
-macro_rules! ts_interval_op {
-    ($lhs:ident, $rhs:ident, $tz:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $type_in1:ty, $type_in2:ty) => {{
-        let prim_array_lhs = $caster1(&$lhs)?;
-        let prim_array_rhs = $caster2(&$rhs)?;
-        let ret: PrimitiveArray<$type_in1> = arrow::compute::try_binary(
-            prim_array_lhs,
-            prim_array_rhs,
-            |ts, interval| Ok($op(ts, interval as i128, $sign)?),
-        )?;
-        Arc::new(ret.with_timezone_opt($tz.clone())) as ArrayRef
-    }};
-}
-
-/// Performs a timestamp subtraction operation on two arrays and returns the resulting array.
-pub fn ts_array_op(array_lhs: &ArrayRef, array_rhs: &ArrayRef) -> Result<ArrayRef> {
-    match (array_lhs.data_type(), array_rhs.data_type()) {
-        (
-            DataType::Timestamp(TimeUnit::Second, opt_tz_lhs),
-            DataType::Timestamp(TimeUnit::Second, opt_tz_rhs),
-        ) => Ok(ts_sub_op!(
-            array_lhs,
-            array_rhs,
-            opt_tz_lhs,
-            opt_tz_rhs,
-            1000i64,
-            as_timestamp_second_array,
-            seconds_sub,
-            NaiveDateTime::timestamp,
-            MILLISECOND_MODE,
-            IntervalDayTimeType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Millisecond, opt_tz_lhs),
-            DataType::Timestamp(TimeUnit::Millisecond, opt_tz_rhs),
-        ) => Ok(ts_sub_op!(
-            array_lhs,
-            array_rhs,
-            opt_tz_lhs,
-            opt_tz_rhs,
-            1i64,
-            as_timestamp_millisecond_array,
-            milliseconds_sub,
-            NaiveDateTime::timestamp_millis,
-            MILLISECOND_MODE,
-            IntervalDayTimeType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Microsecond, opt_tz_lhs),
-            DataType::Timestamp(TimeUnit::Microsecond, opt_tz_rhs),
-        ) => Ok(ts_sub_op!(
-            array_lhs,
-            array_rhs,
-            opt_tz_lhs,
-            opt_tz_rhs,
-            1000i64,
-            as_timestamp_microsecond_array,
-            microseconds_sub,
-            NaiveDateTime::timestamp_micros,
-            NANOSECOND_MODE,
-            IntervalMonthDayNanoType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_lhs),
-            DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_rhs),
-        ) => Ok(ts_sub_op!(
-            array_lhs,
-            array_rhs,
-            opt_tz_lhs,
-            opt_tz_rhs,
-            1i64,
-            as_timestamp_nanosecond_array,
-            nanoseconds_sub,
-            NaiveDateTime::timestamp_nanos,
-            NANOSECOND_MODE,
-            IntervalMonthDayNanoType
-        )),
-        (_, _) => Err(DataFusionError::Execution(format!(
-            "Invalid array types for Timestamp subtraction: {} - {}",
-            array_lhs.data_type(),
-            array_rhs.data_type()
-        ))),
-    }
-}
-/// Performs an interval operation on two arrays and returns the resulting array.
-/// The operation sign determines whether to perform addition or subtraction.
-/// The data type and unit of the two input arrays must match the supported combinations.
-pub fn interval_array_op(
-    array_lhs: &ArrayRef,
-    array_rhs: &ArrayRef,
-    sign: i32,
+    swap: bool,
 ) -> Result<ArrayRef> {
-    match (array_lhs.data_type(), array_rhs.data_type()) {
-        (
-            DataType::Interval(IntervalUnit::YearMonth),
-            DataType::Interval(IntervalUnit::YearMonth),
-        ) => Ok(interval_op!(
-            array_lhs,
-            array_rhs,
-            as_interval_ym_array,
-            op_ym,
-            sign,
-            IntervalYearMonthType
-        )),
-        (
-            DataType::Interval(IntervalUnit::YearMonth),
-            DataType::Interval(IntervalUnit::DayTime),
-        ) => Ok(interval_cross_op!(
-            array_lhs,
-            array_rhs,
-            as_interval_ym_array,
-            as_interval_dt_array,
-            op_ym_dt,
-            sign,
-            false,
-            IntervalYearMonthType,
-            IntervalDayTimeType
-        )),
-        (
-            DataType::Interval(IntervalUnit::YearMonth),
-            DataType::Interval(IntervalUnit::MonthDayNano),
-        ) => Ok(interval_cross_op!(
-            array_lhs,
-            array_rhs,
-            as_interval_ym_array,
-            as_interval_mdn_array,
-            op_ym_mdn,
-            sign,
-            false,
-            IntervalYearMonthType,
-            IntervalMonthDayNanoType
-        )),
-        (
-            DataType::Interval(IntervalUnit::DayTime),
-            DataType::Interval(IntervalUnit::YearMonth),
-        ) => Ok(interval_cross_op!(
-            array_rhs,
-            array_lhs,
-            as_interval_ym_array,
-            as_interval_dt_array,
-            op_ym_dt,
-            sign,
-            true,
-            IntervalYearMonthType,
-            IntervalDayTimeType
-        )),
-        (
-            DataType::Interval(IntervalUnit::DayTime),
-            DataType::Interval(IntervalUnit::DayTime),
-        ) => Ok(interval_op!(
-            array_lhs,
-            array_rhs,
-            as_interval_dt_array,
-            op_dt,
-            sign,
-            IntervalDayTimeType
-        )),
-        (
-            DataType::Interval(IntervalUnit::DayTime),
-            DataType::Interval(IntervalUnit::MonthDayNano),
-        ) => Ok(interval_cross_op!(
-            array_lhs,
-            array_rhs,
-            as_interval_dt_array,
-            as_interval_mdn_array,
-            op_dt_mdn,
-            sign,
-            false,
-            IntervalDayTimeType,
-            IntervalMonthDayNanoType
-        )),
-        (
-            DataType::Interval(IntervalUnit::MonthDayNano),
-            DataType::Interval(IntervalUnit::YearMonth),
-        ) => Ok(interval_cross_op!(
-            array_rhs,
-            array_lhs,
-            as_interval_ym_array,
-            as_interval_mdn_array,
-            op_ym_mdn,
-            sign,
-            true,
-            IntervalYearMonthType,
-            IntervalMonthDayNanoType
+    match (sign, swap) {
+        (1, false) => add_dyn_temporal_right_scalar(arr, scalar),
+        (1, true) => add_dyn_temporal_left_scalar(scalar, arr),
+        (-1, false) => subtract_dyn_temporal_right_scalar(arr, scalar),
+        (-1, true) => subtract_dyn_temporal_left_scalar(scalar, arr),
+        _ => Err(DataFusionError::Internal(
+            "Undefined operation for temporal types".to_string(),
         )),
-        (
-            DataType::Interval(IntervalUnit::MonthDayNano),
-            DataType::Interval(IntervalUnit::DayTime),
-        ) => Ok(interval_cross_op!(
-            array_rhs,
-            array_lhs,
-            as_interval_dt_array,
-            as_interval_mdn_array,
-            op_dt_mdn,
-            sign,
-            true,
-            IntervalDayTimeType,
-            IntervalMonthDayNanoType
-        )),
-        (
-            DataType::Interval(IntervalUnit::MonthDayNano),
-            DataType::Interval(IntervalUnit::MonthDayNano),
-        ) => Ok(interval_op!(
-            array_lhs,
-            array_rhs,
-            as_interval_mdn_array,
-            op_mdn,
-            sign,
-            IntervalMonthDayNanoType
-        )),
-        (_, _) => Err(DataFusionError::Execution(format!(
-            "Invalid array types for Interval operation: {} {} {}",
-            array_lhs.data_type(),
-            sign,
-            array_rhs.data_type()
-        ))),
-    }
-}
-/// Performs a timestamp/interval operation on two arrays and returns the resulting array.
-/// The operation sign determines whether to perform addition or subtraction.
-/// The data type and unit of the two input arrays must match the supported combinations.
-pub fn ts_interval_array_op(
-    array_lhs: &ArrayRef,
-    sign: i32,
-    array_rhs: &ArrayRef,
-) -> Result<ArrayRef> {
-    match (array_lhs.data_type(), array_rhs.data_type()) {
-        (
-            DataType::Timestamp(TimeUnit::Second, tz),
-            DataType::Interval(IntervalUnit::YearMonth),
-        ) => Ok(ts_interval_op!(
-            array_lhs,
-            array_rhs,
-            tz,
-            as_timestamp_second_array,
-            as_interval_ym_array,
-            seconds_add_array::<YM_MODE>,
-            sign,
-            TimestampSecondType,
-            IntervalYearMonthType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Second, tz),
-            DataType::Interval(IntervalUnit::DayTime),
-        ) => Ok(ts_interval_op!(
-            array_lhs,
-            array_rhs,
-            tz,
-            as_timestamp_second_array,
-            as_interval_dt_array,
-            seconds_add_array::<DT_MODE>,
-            sign,
-            TimestampSecondType,
-            IntervalDayTimeType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Second, tz),
-            DataType::Interval(IntervalUnit::MonthDayNano),
-        ) => Ok(ts_interval_op!(
-            array_lhs,
-            array_rhs,
-            tz,
-            as_timestamp_second_array,
-            as_interval_mdn_array,
-            seconds_add_array::<MDN_MODE>,
-            sign,
-            TimestampSecondType,
-            IntervalMonthDayNanoType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Millisecond, tz),
-            DataType::Interval(IntervalUnit::YearMonth),
-        ) => Ok(ts_interval_op!(
-            array_lhs,
-            array_rhs,
-            tz,
-            as_timestamp_millisecond_array,
-            as_interval_ym_array,
-            milliseconds_add_array::<YM_MODE>,
-            sign,
-            TimestampMillisecondType,
-            IntervalYearMonthType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Millisecond, tz),
-            DataType::Interval(IntervalUnit::DayTime),
-        ) => Ok(ts_interval_op!(
-            array_lhs,
-            array_rhs,
-            tz,
-            as_timestamp_millisecond_array,
-            as_interval_dt_array,
-            milliseconds_add_array::<DT_MODE>,
-            sign,
-            TimestampMillisecondType,
-            IntervalDayTimeType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Millisecond, tz),
-            DataType::Interval(IntervalUnit::MonthDayNano),
-        ) => Ok(ts_interval_op!(
-            array_lhs,
-            array_rhs,
-            tz,
-            as_timestamp_millisecond_array,
-            as_interval_mdn_array,
-            milliseconds_add_array::<MDN_MODE>,
-            sign,
-            TimestampMillisecondType,
-            IntervalMonthDayNanoType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Microsecond, tz),
-            DataType::Interval(IntervalUnit::YearMonth),
-        ) => Ok(ts_interval_op!(
-            array_lhs,
-            array_rhs,
-            tz,
-            as_timestamp_microsecond_array,
-            as_interval_ym_array,
-            microseconds_add_array::<YM_MODE>,
-            sign,
-            TimestampMicrosecondType,
-            IntervalYearMonthType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Microsecond, tz),
-            DataType::Interval(IntervalUnit::DayTime),
-        ) => Ok(ts_interval_op!(
-            array_lhs,
-            array_rhs,
-            tz,
-            as_timestamp_microsecond_array,
-            as_interval_dt_array,
-            microseconds_add_array::<DT_MODE>,
-            sign,
-            TimestampMicrosecondType,
-            IntervalDayTimeType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Microsecond, tz),
-            DataType::Interval(IntervalUnit::MonthDayNano),
-        ) => Ok(ts_interval_op!(
-            array_lhs,
-            array_rhs,
-            tz,
-            as_timestamp_microsecond_array,
-            as_interval_mdn_array,
-            microseconds_add_array::<MDN_MODE>,
-            sign,
-            TimestampMicrosecondType,
-            IntervalMonthDayNanoType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Nanosecond, tz),
-            DataType::Interval(IntervalUnit::YearMonth),
-        ) => Ok(ts_interval_op!(
-            array_lhs,
-            array_rhs,
-            tz,
-            as_timestamp_nanosecond_array,
-            as_interval_ym_array,
-            nanoseconds_add_array::<YM_MODE>,
-            sign,
-            TimestampNanosecondType,
-            IntervalYearMonthType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Nanosecond, tz),
-            DataType::Interval(IntervalUnit::DayTime),
-        ) => Ok(ts_interval_op!(
-            array_lhs,
-            array_rhs,
-            tz,
-            as_timestamp_nanosecond_array,
-            as_interval_dt_array,
-            nanoseconds_add_array::<DT_MODE>,
-            sign,
-            TimestampNanosecondType,
-            IntervalDayTimeType
-        )),
-        (
-            DataType::Timestamp(TimeUnit::Nanosecond, tz),
-            DataType::Interval(IntervalUnit::MonthDayNano),
-        ) => Ok(ts_interval_op!(
-            array_lhs,
-            array_rhs,
-            tz,
-            as_timestamp_nanosecond_array,
-            as_interval_mdn_array,
-            nanoseconds_add_array::<MDN_MODE>,
-            sign,
-            TimestampNanosecondType,
-            IntervalMonthDayNanoType
-        )),
-        (_, _) => Err(DataFusionError::Execution(format!(
-            "Invalid array types for Timestamp Interval operation: {} {} {}",
-            array_lhs.data_type(),
-            sign,
-            array_rhs.data_type()
-        ))),
     }
 }
 
diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
index 50a9f86c06..4d984ac8e8 100644
--- a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
+++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
@@ -31,19 +31,35 @@ use arrow::{array::*, datatypes::ArrowNumericType, downcast_dictionary_array};
 use arrow_array::types::{ArrowDictionaryKeyType, DecimalType};
 use arrow_array::ArrowNativeTypeOp;
 use arrow_buffer::ArrowNativeType;
-use arrow_schema::DataType;
+use arrow_schema::{DataType, IntervalUnit};
+use chrono::{Days, Duration, Months, NaiveDate, NaiveDateTime};
 use datafusion_common::cast::{as_date32_array, as_date64_array, as_decimal128_array};
-use datafusion_common::scalar::{date32_add, date64_add};
+use datafusion_common::scalar::{date32_op, date64_op};
 use datafusion_common::{DataFusionError, Result, ScalarValue};
-use datafusion_expr::ColumnarValue;
 use std::cmp::min;
+use std::ops::Add;
 use std::sync::Arc;
 
-use super::{
-    interval_array_op, interval_scalar_interval_op, ts_array_op, ts_interval_array_op,
-    ts_scalar_interval_op, ts_scalar_ts_op,
+use arrow::compute::unary;
+use arrow::datatypes::*;
+
+use arrow_array::temporal_conversions::{MILLISECONDS_IN_DAY, NANOSECONDS_IN_DAY};
+use datafusion_common::delta::shift_months;
+use datafusion_common::scalar::{
+    calculate_naives, microseconds_add, microseconds_sub, milliseconds_add,
+    milliseconds_sub, nanoseconds_add, nanoseconds_sub, op_dt, op_dt_mdn, op_mdn, op_ym,
+    op_ym_dt, op_ym_mdn, parse_timezones, seconds_add, MILLISECOND_MODE, NANOSECOND_MODE,
 };
 
+use arrow::datatypes::TimeUnit;
+
+use datafusion_common::cast::{
+    as_interval_dt_array, as_interval_mdn_array, as_interval_ym_array,
+    as_timestamp_microsecond_array, as_timestamp_millisecond_array,
+    as_timestamp_nanosecond_array, as_timestamp_second_array,
+};
+use datafusion_common::scalar::*;
+
 // Simple (low performance) kernels until optimized kernels are added to arrow
 // See https://github.com/apache/arrow-rs/issues/960
 
@@ -333,34 +349,88 @@ pub(crate) fn add_dyn_temporal(left: &ArrayRef, right: &ArrayRef) -> Result<Arra
     }
 }
 
-pub(crate) fn add_dyn_temporal_scalar(
+pub(crate) fn add_dyn_temporal_right_scalar(
     left: &ArrayRef,
     right: &ScalarValue,
-) -> Result<ColumnarValue> {
+) -> Result<ArrayRef> {
     match (left.data_type(), right.get_datatype()) {
+        // Date32 + Interval
         (DataType::Date32, DataType::Interval(..)) => {
             let left = as_date32_array(&left)?;
             let ret = Arc::new(try_unary::<Date32Type, _, Date32Type>(left, |days| {
-                Ok(date32_add(days, right, 1)?)
-            })?) as ArrayRef;
-            Ok(ColumnarValue::Array(ret))
+                Ok(date32_op(days, right, 1)?)
+            })?) as _;
+            Ok(ret)
         }
+        // Date64 + Interval
         (DataType::Date64, DataType::Interval(..)) => {
             let left = as_date64_array(&left)?;
             let ret = Arc::new(try_unary::<Date64Type, _, Date64Type>(left, |ms| {
-                Ok(date64_add(ms, right, 1)?)
-            })?) as ArrayRef;
-            Ok(ColumnarValue::Array(ret))
+                Ok(date64_op(ms, right, 1)?)
+            })?) as _;
+            Ok(ret)
+        }
+        // Interval + Interval
+        (DataType::Interval(..), DataType::Interval(..)) => {
+            interval_op_scalar_interval(left, 1, right)
+        }
+        // Timestamp + Interval
+        (DataType::Timestamp(..), DataType::Interval(..)) => {
+            ts_op_scalar_interval(left, 1, right)
+        }
+        _ => {
+            // fall back to kernels in arrow-rs
+            Ok(add_dyn(left, &right.to_array())?)
+        }
+    }
+}
+
+pub(crate) fn add_dyn_temporal_left_scalar(
+    left: &ScalarValue,
+    right: &ArrayRef,
+) -> Result<ArrayRef> {
+    match (left.get_datatype(), right.data_type()) {
+        // Date32 + Interval
+        (DataType::Date32, DataType::Interval(..)) => {
+            if let ScalarValue::Date32(Some(left)) = left {
+                scalar_date32_array_interval_op(
+                    *left,
+                    right,
+                    NaiveDate::checked_add_days,
+                    NaiveDate::checked_add_months,
+                )
+            } else {
+                Err(DataFusionError::Internal(
+                    "Date32 value is None".to_string(),
+                ))
+            }
         }
+        // Date64 + Interval
+        (DataType::Date64, DataType::Interval(..)) => {
+            if let ScalarValue::Date64(Some(left)) = left {
+                scalar_date64_array_interval_op(
+                    *left,
+                    right,
+                    NaiveDate::checked_add_days,
+                    NaiveDate::checked_add_months,
+                )
+            } else {
+                Err(DataFusionError::Internal(
+                    "Date64 value is None".to_string(),
+                ))
+            }
+        }
+        // Interval + Interval
         (DataType::Interval(..), DataType::Interval(..)) => {
-            interval_scalar_interval_op(left, 1, right)
+            scalar_interval_op_interval(left, 1, right)
         }
+        // Timestamp + Interval
         (DataType::Timestamp(..), DataType::Interval(..)) => {
-            ts_scalar_interval_op(left, 1, right)
+            scalar_ts_op_interval(left, 1, right)
         }
         _ => {
             // fall back to kernels in arrow-rs
-            Ok(ColumnarValue::Array(add_dyn(left, &right.to_array())?))
+            Ok(add_dyn(&left.to_array(), right)?)
         }
     }
 }
@@ -398,41 +468,152 @@ pub(crate) fn subtract_dyn_temporal(
     }
 }
 
-pub(crate) fn subtract_dyn_temporal_scalar(
+pub(crate) fn subtract_dyn_temporal_right_scalar(
     left: &ArrayRef,
     right: &ScalarValue,
-) -> Result<ColumnarValue> {
+) -> Result<ArrayRef> {
     match (left.data_type(), right.get_datatype()) {
+        // Date32 - Interval
         (DataType::Date32, DataType::Interval(..)) => {
             let left = as_date32_array(&left)?;
             let ret = Arc::new(try_unary::<Date32Type, _, Date32Type>(left, |days| {
-                Ok(date32_add(days, right, -1)?)
-            })?) as ArrayRef;
-            Ok(ColumnarValue::Array(ret))
+                Ok(date32_op(days, right, -1)?)
+            })?) as _;
+            Ok(ret)
         }
+        // Date64 - Interval
         (DataType::Date64, DataType::Interval(..)) => {
             let left = as_date64_array(&left)?;
             let ret = Arc::new(try_unary::<Date64Type, _, Date64Type>(left, |ms| {
-                Ok(date64_add(ms, right, -1)?)
-            })?) as ArrayRef;
-            Ok(ColumnarValue::Array(ret))
+                Ok(date64_op(ms, right, -1)?)
+            })?) as _;
+            Ok(ret)
         }
+        // Timestamp - Timestamp
         (DataType::Timestamp(..), DataType::Timestamp(..)) => {
-            ts_scalar_ts_op(left, right)
+            ts_sub_scalar_ts(left, right)
         }
+        // Interval - Interval
         (DataType::Interval(..), DataType::Interval(..)) => {
-            interval_scalar_interval_op(left, -1, right)
+            interval_op_scalar_interval(left, -1, right)
         }
+        // Timestamp - Interval
         (DataType::Timestamp(..), DataType::Interval(..)) => {
-            ts_scalar_interval_op(left, -1, right)
+            ts_op_scalar_interval(left, -1, right)
         }
         _ => {
             // fall back to kernels in arrow-rs
-            Ok(ColumnarValue::Array(subtract_dyn(left, &right.to_array())?))
+            Ok(subtract_dyn(left, &right.to_array())?)
         }
     }
 }
 
+pub(crate) fn subtract_dyn_temporal_left_scalar(
+    left: &ScalarValue,
+    right: &ArrayRef,
+) -> Result<ArrayRef> {
+    match (left.get_datatype(), right.data_type()) {
+        // Date32 - Interval
+        (DataType::Date32, DataType::Interval(..)) => {
+            if let ScalarValue::Date32(Some(left)) = left {
+                scalar_date32_array_interval_op(
+                    *left,
+                    right,
+                    NaiveDate::checked_sub_days,
+                    NaiveDate::checked_sub_months,
+                )
+            } else {
+                Err(DataFusionError::Internal(
+                    "Date32 value is None".to_string(),
+                ))
+            }
+        }
+        // Date64 - Interval
+        (DataType::Date64, DataType::Interval(..)) => {
+            if let ScalarValue::Date64(Some(left)) = left {
+                scalar_date64_array_interval_op(
+                    *left,
+                    right,
+                    NaiveDate::checked_sub_days,
+                    NaiveDate::checked_sub_months,
+                )
+            } else {
+                Err(DataFusionError::Internal(
+                    "Date64 value is None".to_string(),
+                ))
+            }
+        }
+        // Timestamp - Timestamp
+        (DataType::Timestamp(..), DataType::Timestamp(..)) => {
+            scalar_ts_sub_ts(left, right)
+        }
+        // Interval - Interval
+        (DataType::Interval(..), DataType::Interval(..)) => {
+            scalar_interval_op_interval(left, -1, right)
+        }
+        // Timestamp - Interval
+        (DataType::Timestamp(..), DataType::Interval(..)) => {
+            scalar_ts_op_interval(left, -1, right)
+        }
+        _ => {
+            // fall back to kernels in arrow-rs
+            Ok(subtract_dyn(&left.to_array(), right)?)
+        }
+    }
+}
+
+fn scalar_date32_array_interval_op(
+    left: i32,
+    right: &ArrayRef,
+    day_op: fn(NaiveDate, Days) -> Option<NaiveDate>,
+    month_op: fn(NaiveDate, Months) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+    let epoch = NaiveDate::from_ymd_opt(1970, 1, 1)
+        .ok_or_else(|| DataFusionError::Execution("Invalid Date entered".to_string()))?;
+    let prior = epoch.add(Duration::days(left as i64));
+    match right.data_type() {
+        DataType::Interval(IntervalUnit::YearMonth) => {
+            date32_interval_ym_op(right, &epoch, &prior, month_op)
+        }
+        DataType::Interval(IntervalUnit::DayTime) => {
+            date32_interval_dt_op(right, &epoch, &prior, day_op)
+        }
+        DataType::Interval(IntervalUnit::MonthDayNano) => {
+            date32_interval_mdn_op(right, &epoch, &prior, day_op, month_op)
+        }
+        _ => Err(DataFusionError::Internal(format!(
+            "Expected type is an interval, but {} is found",
+            right.data_type()
+        ))),
+    }
+}
+
+fn scalar_date64_array_interval_op(
+    left: i64,
+    right: &ArrayRef,
+    day_op: fn(NaiveDate, Days) -> Option<NaiveDate>,
+    month_op: fn(NaiveDate, Months) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+    let epoch = NaiveDate::from_ymd_opt(1970, 1, 1)
+        .ok_or_else(|| DataFusionError::Execution("Invalid Date entered".to_string()))?;
+    let prior = epoch.add(Duration::milliseconds(left));
+    match right.data_type() {
+        DataType::Interval(IntervalUnit::YearMonth) => {
+            date64_interval_ym_op(right, &epoch, &prior, month_op)
+        }
+        DataType::Interval(IntervalUnit::DayTime) => {
+            date64_interval_dt_op(right, &epoch, &prior, day_op)
+        }
+        DataType::Interval(IntervalUnit::MonthDayNano) => {
+            date64_interval_mdn_op(right, &epoch, &prior, day_op, month_op)
+        }
+        _ => Err(DataFusionError::Internal(format!(
+            "Expected type is an interval, but {} is found",
+            right.data_type()
+        ))),
+    }
+}
+
 fn get_precision_scale(data_type: &DataType) -> Result<(u8, i8)> {
     match data_type {
         DataType::Decimal128(precision, scale) => Ok((*precision, *scale)),
@@ -704,6 +885,1297 @@ pub(crate) fn modulus_decimal_dyn_scalar(
     decimal_array_with_precision_scale(array, precision, scale)
 }
 
+macro_rules! sub_timestamp_macro {
+    ($array:expr, $rhs:expr, $caster:expr, $interval_type:ty, $opt_tz_lhs:expr, $multiplier:expr,
+        $opt_tz_rhs:expr, $unit_sub:expr, $naive_sub_fn:expr, $counter:expr) => {{
+        let prim_array = $caster(&$array)?;
+        let ret: PrimitiveArray<$interval_type> = try_unary(prim_array, |lhs| {
+            let (parsed_lhs_tz, parsed_rhs_tz) =
+                (parse_timezones($opt_tz_lhs)?, parse_timezones($opt_tz_rhs)?);
+            let (naive_lhs, naive_rhs) = calculate_naives::<$unit_sub>(
+                lhs.mul_wrapping($multiplier),
+                parsed_lhs_tz,
+                $rhs.mul_wrapping($multiplier),
+                parsed_rhs_tz,
+            )?;
+            Ok($naive_sub_fn($counter(&naive_lhs), $counter(&naive_rhs)))
+        })?;
+        Arc::new(ret) as _
+    }};
+}
+
+macro_rules! sub_timestamp_left_scalar_macro {
+    ($array:expr, $lhs:expr, $caster:expr, $interval_type:ty, $opt_tz_lhs:expr, $multiplier:expr,
+        $opt_tz_rhs:expr, $unit_sub:expr, $naive_sub_fn:expr, $counter:expr) => {{
+        let prim_array = $caster(&$array)?;
+        let ret: PrimitiveArray<$interval_type> = try_unary(prim_array, |rhs| {
+            let (parsed_lhs_tz, parsed_rhs_tz) =
+                (parse_timezones($opt_tz_lhs)?, parse_timezones($opt_tz_rhs)?);
+            let (naive_lhs, naive_rhs) = calculate_naives::<$unit_sub>(
+                $lhs.mul_wrapping($multiplier),
+                parsed_lhs_tz,
+                rhs.mul_wrapping($multiplier),
+                parsed_rhs_tz,
+            )?;
+            Ok($naive_sub_fn($counter(&naive_lhs), $counter(&naive_rhs)))
+        })?;
+        Arc::new(ret) as _
+    }};
+}
+
+macro_rules! op_timestamp_interval_macro {
+    ($array:expr, $as_timestamp:expr, $ts_type:ty, $fn_op:expr, $scalar:expr, $sign:expr, $tz:expr) => {{
+        let array = $as_timestamp(&$array)?;
+        let ret: PrimitiveArray<$ts_type> =
+            try_unary::<$ts_type, _, $ts_type>(array, |ts_s| {
+                Ok($fn_op(ts_s, $scalar, $sign)?)
+            })?;
+        Arc::new(ret.with_timezone_opt($tz.clone())) as _
+    }};
+}
+
+macro_rules! scalar_ts_op_interval_macro {
+    ($ts:ident, $tz:ident, $interval:ident, $sign:ident,
+        $caster1:expr, $type1:ty, $type2:ty, $op:expr, $back_caster:expr) => {{
+        let interval = $caster1(&$interval)?;
+        let ret: PrimitiveArray<$type1> =
+            try_unary::<$type2, _, $type1>(interval, |e| {
+                let prior = $ts.ok_or_else(|| {
+                    DataFusionError::Internal("Timestamp is out-of-range".to_string())
+                })?;
+                Ok($back_caster(&$op(prior, e, $sign)))
+            })?;
+        Arc::new(ret.with_timezone_opt($tz.clone())) as _
+    }};
+}
+
+macro_rules! op_interval_macro {
+    ($array:expr, $as_interval:expr, $interval_type:ty, $fn_op:expr, $scalar:expr, $sign:expr) => {{
+        let array = $as_interval(&$array)?;
+        let ret: PrimitiveArray<$interval_type> =
+            unary(array, |lhs| $fn_op(lhs, *$scalar, $sign));
+        Arc::new(ret) as _
+    }};
+}
+
+macro_rules! op_interval_cross_macro {
+    ($array:expr, $as_interval:expr, $commute:expr, $fn_op:expr, $scalar:expr, $sign:expr, $t1:ty, $t2:ty) => {{
+        let array = $as_interval(&$array)?;
+        let ret: PrimitiveArray<IntervalMonthDayNanoType> = if $commute {
+            unary(array, |lhs| {
+                $fn_op(*$scalar as $t1, lhs as $t2, $sign, $commute)
+            })
+        } else {
+            unary(array, |lhs| {
+                $fn_op(lhs as $t1, *$scalar as $t2, $sign, $commute)
+            })
+        };
+        Arc::new(ret) as _
+    }};
+}
+
+macro_rules! ts_sub_op {
+    ($lhs:ident, $rhs:ident, $lhs_tz:ident, $rhs_tz:ident, $coef:expr, $caster:expr, $op:expr, $ts_unit:expr, $mode:expr, $type_out:ty) => {{
+        let prim_array_lhs = $caster(&$lhs)?;
+        let prim_array_rhs = $caster(&$rhs)?;
+        let ret: PrimitiveArray<$type_out> =
+            arrow::compute::try_binary(prim_array_lhs, prim_array_rhs, |ts1, ts2| {
+                let (parsed_lhs_tz, parsed_rhs_tz) = (
+                    parse_timezones($lhs_tz.as_deref())?,
+                    parse_timezones($rhs_tz.as_deref())?,
+                );
+                let (naive_lhs, naive_rhs) = calculate_naives::<$mode>(
+                    ts1.mul_wrapping($coef),
+                    parsed_lhs_tz,
+                    ts2.mul_wrapping($coef),
+                    parsed_rhs_tz,
+                )?;
+                Ok($op($ts_unit(&naive_lhs), $ts_unit(&naive_rhs)))
+            })?;
+        Arc::new(ret) as _
+    }};
+}
+
+macro_rules! interval_op {
+    ($lhs:ident, $rhs:ident, $caster:expr, $op:expr, $sign:ident, $type_in:ty) => {{
+        let prim_array_lhs = $caster(&$lhs)?;
+        let prim_array_rhs = $caster(&$rhs)?;
+        Arc::new(arrow::compute::binary::<$type_in, $type_in, _, $type_in>(
+            prim_array_lhs,
+            prim_array_rhs,
+            |interval1, interval2| $op(interval1, interval2, $sign),
+        )?) as _
+    }};
+}
+
+macro_rules! interval_cross_op {
+    ($lhs:ident, $rhs:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $commute:ident, $type_in1:ty, $type_in2:ty) => {{
+        let prim_array_lhs = $caster1(&$lhs)?;
+        let prim_array_rhs = $caster2(&$rhs)?;
+        Arc::new(arrow::compute::binary::<
+            $type_in1,
+            $type_in2,
+            _,
+            IntervalMonthDayNanoType,
+        >(
+            prim_array_lhs,
+            prim_array_rhs,
+            |interval1, interval2| $op(interval1, interval2, $sign, $commute),
+        )?) as _
+    }};
+}
+
+macro_rules! ts_interval_op {
+    ($lhs:ident, $rhs:ident, $tz:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $type_in1:ty, $type_in2:ty) => {{
+        let prim_array_lhs = $caster1(&$lhs)?;
+        let prim_array_rhs = $caster2(&$rhs)?;
+        let ret: PrimitiveArray<$type_in1> = arrow::compute::try_binary(
+            prim_array_lhs,
+            prim_array_rhs,
+            |ts, interval| Ok($op(ts, interval as i128, $sign)?),
+        )?;
+        Arc::new(ret.with_timezone_opt($tz.clone())) as _
+    }};
+}
+
+/// This function handles timestamp - timestamp operations where the former is
+/// an array and the latter is a scalar, resulting in an array.
+pub fn ts_sub_scalar_ts(array: &ArrayRef, scalar: &ScalarValue) -> Result<ArrayRef> {
+    let ret = match (array.data_type(), scalar) {
+        (
+            DataType::Timestamp(TimeUnit::Second, opt_tz_lhs),
+            ScalarValue::TimestampSecond(Some(rhs), opt_tz_rhs),
+        ) => {
+            sub_timestamp_macro!(
+                array,
+                rhs,
+                as_timestamp_second_array,
+                IntervalDayTimeType,
+                opt_tz_lhs.as_deref(),
+                1000,
+                opt_tz_rhs.as_deref(),
+                MILLISECOND_MODE,
+                seconds_sub,
+                NaiveDateTime::timestamp
+            )
+        }
+        (
+            DataType::Timestamp(TimeUnit::Millisecond, opt_tz_lhs),
+            ScalarValue::TimestampMillisecond(Some(rhs), opt_tz_rhs),
+        ) => {
+            sub_timestamp_macro!(
+                array,
+                rhs,
+                as_timestamp_millisecond_array,
+                IntervalDayTimeType,
+                opt_tz_lhs.as_deref(),
+                1,
+                opt_tz_rhs.as_deref(),
+                MILLISECOND_MODE,
+                milliseconds_sub,
+                NaiveDateTime::timestamp_millis
+            )
+        }
+        (
+            DataType::Timestamp(TimeUnit::Microsecond, opt_tz_lhs),
+            ScalarValue::TimestampMicrosecond(Some(rhs), opt_tz_rhs),
+        ) => {
+            sub_timestamp_macro!(
+                array,
+                rhs,
+                as_timestamp_microsecond_array,
+                IntervalMonthDayNanoType,
+                opt_tz_lhs.as_deref(),
+                1000,
+                opt_tz_rhs.as_deref(),
+                NANOSECOND_MODE,
+                microseconds_sub,
+                NaiveDateTime::timestamp_micros
+            )
+        }
+        (
+            DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_lhs),
+            ScalarValue::TimestampNanosecond(Some(rhs), opt_tz_rhs),
+        ) => {
+            sub_timestamp_macro!(
+                array,
+                rhs,
+                as_timestamp_nanosecond_array,
+                IntervalMonthDayNanoType,
+                opt_tz_lhs.as_deref(),
+                1,
+                opt_tz_rhs.as_deref(),
+                NANOSECOND_MODE,
+                nanoseconds_sub,
+                NaiveDateTime::timestamp_nanos
+            )
+        }
+        (_, _) => {
+            return Err(DataFusionError::Internal(format!(
+                "Invalid array - scalar types for Timestamp subtraction: {:?} - {:?}",
+                array.data_type(),
+                scalar.get_datatype()
+            )));
+        }
+    };
+    Ok(ret)
+}
+
+/// This function handles timestamp - timestamp operations where the former is
+/// a scalar and the latter is an array, resulting in an array.
+pub fn scalar_ts_sub_ts(scalar: &ScalarValue, array: &ArrayRef) -> Result<ArrayRef> {
+    let ret = match (scalar, array.data_type()) {
+        (
+            ScalarValue::TimestampSecond(Some(lhs), opt_tz_lhs),
+            DataType::Timestamp(TimeUnit::Second, opt_tz_rhs),
+        ) => {
+            sub_timestamp_left_scalar_macro!(
+                array,
+                lhs,
+                as_timestamp_second_array,
+                IntervalDayTimeType,
+                opt_tz_lhs.as_deref(),
+                1000,
+                opt_tz_rhs.as_deref(),
+                MILLISECOND_MODE,
+                seconds_sub,
+                NaiveDateTime::timestamp
+            )
+        }
+        (
+            ScalarValue::TimestampMillisecond(Some(lhs), opt_tz_lhs),
+            DataType::Timestamp(TimeUnit::Millisecond, opt_tz_rhs),
+        ) => {
+            sub_timestamp_left_scalar_macro!(
+                array,
+                lhs,
+                as_timestamp_millisecond_array,
+                IntervalDayTimeType,
+                opt_tz_lhs.as_deref(),
+                1,
+                opt_tz_rhs.as_deref(),
+                MILLISECOND_MODE,
+                milliseconds_sub,
+                NaiveDateTime::timestamp_millis
+            )
+        }
+        (
+            ScalarValue::TimestampMicrosecond(Some(lhs), opt_tz_lhs),
+            DataType::Timestamp(TimeUnit::Microsecond, opt_tz_rhs),
+        ) => {
+            sub_timestamp_left_scalar_macro!(
+                array,
+                lhs,
+                as_timestamp_microsecond_array,
+                IntervalMonthDayNanoType,
+                opt_tz_lhs.as_deref(),
+                1000,
+                opt_tz_rhs.as_deref(),
+                NANOSECOND_MODE,
+                microseconds_sub,
+                NaiveDateTime::timestamp_micros
+            )
+        }
+        (
+            ScalarValue::TimestampNanosecond(Some(lhs), opt_tz_lhs),
+            DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_rhs),
+        ) => {
+            sub_timestamp_left_scalar_macro!(
+                array,
+                lhs,
+                as_timestamp_nanosecond_array,
+                IntervalMonthDayNanoType,
+                opt_tz_lhs.as_deref(),
+                1,
+                opt_tz_rhs.as_deref(),
+                NANOSECOND_MODE,
+                nanoseconds_sub,
+                NaiveDateTime::timestamp_nanos
+            )
+        }
+        (_, _) => {
+            return Err(DataFusionError::Internal(format!(
+                "Invalid scalar - array types for Timestamp subtraction: {:?} - {:?}",
+                scalar.get_datatype(),
+                array.data_type()
+            )));
+        }
+    };
+    Ok(ret)
+}
+
+/// This function handles timestamp +/- interval operations where the former is
+/// an array and the latter is a scalar, resulting in an array.
+pub fn ts_op_scalar_interval(
+    array: &ArrayRef,
+    sign: i32,
+    scalar: &ScalarValue,
+) -> Result<ArrayRef> {
+    let ret = match array.data_type() {
+        DataType::Timestamp(TimeUnit::Second, tz) => {
+            op_timestamp_interval_macro!(
+                array,
+                as_timestamp_second_array,
+                TimestampSecondType,
+                seconds_add,
+                scalar,
+                sign,
+                tz
+            )
+        }
+        DataType::Timestamp(TimeUnit::Millisecond, tz) => {
+            op_timestamp_interval_macro!(
+                array,
+                as_timestamp_millisecond_array,
+                TimestampMillisecondType,
+                milliseconds_add,
+                scalar,
+                sign,
+                tz
+            )
+        }
+        DataType::Timestamp(TimeUnit::Microsecond, tz) => {
+            op_timestamp_interval_macro!(
+                array,
+                as_timestamp_microsecond_array,
+                TimestampMicrosecondType,
+                microseconds_add,
+                scalar,
+                sign,
+                tz
+            )
+        }
+        DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
+            op_timestamp_interval_macro!(
+                array,
+                as_timestamp_nanosecond_array,
+                TimestampNanosecondType,
+                nanoseconds_add,
+                scalar,
+                sign,
+                tz
+            )
+        }
+        _ => Err(DataFusionError::Internal(format!(
+            "Invalid lhs type for Timestamp vs Interval operations: {}",
+            array.data_type()
+        )))?,
+    };
+    Ok(ret)
+}
+
+/// This function handles timestamp +/- interval operations where the former is
+/// a scalar and the latter is an array, resulting in an array.
+pub fn scalar_ts_op_interval(
+    scalar: &ScalarValue,
+    sign: i32,
+    array: &ArrayRef,
+) -> Result<ArrayRef> {
+    use DataType::*;
+    use IntervalUnit::*;
+    use ScalarValue::*;
+    let ret = match (scalar, array.data_type()) {
+        // Second op YearMonth
+        (TimestampSecond(Some(ts_sec), tz), Interval(YearMonth)) => {
+            let naive_date = NaiveDateTime::from_timestamp_opt(*ts_sec, 0);
+            scalar_ts_op_interval_macro!(
+                naive_date,
+                tz,
+                array,
+                sign,
+                as_interval_ym_array,
+                TimestampSecondType,
+                IntervalYearMonthType,
+                shift_months,
+                NaiveDateTime::timestamp
+            )
+        }
+        // Millisecond op YearMonth
+        (TimestampMillisecond(Some(ts_ms), tz), Interval(YearMonth)) => {
+            let naive_date = NaiveDateTime::from_timestamp_millis(*ts_ms);
+            scalar_ts_op_interval_macro!(
+                naive_date,
+                tz,
+                array,
+                sign,
+                as_interval_ym_array,
+                TimestampSecondType,
+                IntervalYearMonthType,
+                shift_months,
+                NaiveDateTime::timestamp
+            )
+        }
+        // Microsecond op YearMonth
+        (TimestampMicrosecond(Some(ts_us), tz), Interval(YearMonth)) => {
+            let naive_date = NaiveDateTime::from_timestamp_micros(*ts_us);
+            scalar_ts_op_interval_macro!(
+                naive_date,
+                tz,
+                array,
+                sign,
+                as_interval_ym_array,
+                TimestampSecondType,
+                IntervalYearMonthType,
+                shift_months,
+                NaiveDateTime::timestamp
+            )
+        }
+        // Nanosecond op YearMonth
+        (TimestampNanosecond(Some(ts_ns), tz), Interval(YearMonth)) => {
+            let naive_date = NaiveDateTime::from_timestamp_opt(
+                ts_ns.div_euclid(1_000_000_000),
+                ts_ns.rem_euclid(1_000_000_000).try_into().map_err(|_| {
+                    DataFusionError::Internal("Overflow of divison".to_string())
+                })?,
+            );
+            scalar_ts_op_interval_macro!(
+                naive_date,
+                tz,
+                array,
+                sign,
+                as_interval_ym_array,
+                TimestampSecondType,
+                IntervalYearMonthType,
+                shift_months,
+                NaiveDateTime::timestamp
+            )
+        }
+        // Second op DayTime
+        (TimestampSecond(Some(ts_sec), tz), Interval(DayTime)) => {
+            let naive_date = NaiveDateTime::from_timestamp_opt(*ts_sec, 0);
+            scalar_ts_op_interval_macro!(
+                naive_date,
+                tz,
+                array,
+                sign,
+                as_interval_dt_array,
+                TimestampSecondType,
+                IntervalDayTimeType,
+                add_day_time,
+                NaiveDateTime::timestamp
+            )
+        }
+        // Millisecond op DayTime
+        (TimestampMillisecond(Some(ts_ms), tz), Interval(DayTime)) => {
+            let naive_date = NaiveDateTime::from_timestamp_millis(*ts_ms);
+            scalar_ts_op_interval_macro!(
+                naive_date,
+                tz,
+                array,
+                sign,
+                as_interval_dt_array,
+                TimestampMillisecondType,
+                IntervalDayTimeType,
+                add_day_time,
+                NaiveDateTime::timestamp_millis
+            )
+        }
+        // Microsecond op DayTime
+        (TimestampMicrosecond(Some(ts_us), tz), Interval(DayTime)) => {
+            let naive_date = NaiveDateTime::from_timestamp_micros(*ts_us);
+            scalar_ts_op_interval_macro!(
+                naive_date,
+                tz,
+                array,
+                sign,
+                as_interval_dt_array,
+                TimestampMicrosecondType,
+                IntervalDayTimeType,
+                add_day_time,
+                NaiveDateTime::timestamp_micros
+            )
+        }
+        // Nanosecond op DayTime
+        (TimestampNanosecond(Some(ts_ns), tz), Interval(DayTime)) => {
+            let naive_date = NaiveDateTime::from_timestamp_opt(
+                ts_ns.div_euclid(1_000_000_000),
+                ts_ns.rem_euclid(1_000_000_000).try_into().map_err(|_| {
+                    DataFusionError::Internal("Overflow of divison".to_string())
+                })?,
+            );
+            scalar_ts_op_interval_macro!(
+                naive_date,
+                tz,
+                array,
+                sign,
+                as_interval_dt_array,
+                TimestampNanosecondType,
+                IntervalDayTimeType,
+                add_day_time,
+                NaiveDateTime::timestamp_nanos
+            )
+        }
+        // Second op MonthDayNano
+        (TimestampSecond(Some(ts_sec), tz), Interval(MonthDayNano)) => {
+            let naive_date = NaiveDateTime::from_timestamp_opt(*ts_sec, 0);
+            scalar_ts_op_interval_macro!(
+                naive_date,
+                tz,
+                array,
+                sign,
+                as_interval_mdn_array,
+                TimestampSecondType,
+                IntervalMonthDayNanoType,
+                add_m_d_nano,
+                NaiveDateTime::timestamp
+            )
+        }
+        // Millisecond op MonthDayNano
+        (TimestampMillisecond(Some(ts_ms), tz), Interval(MonthDayNano)) => {
+            let naive_date = NaiveDateTime::from_timestamp_millis(*ts_ms);
+            scalar_ts_op_interval_macro!(
+                naive_date,
+                tz,
+                array,
+                sign,
+                as_interval_mdn_array,
+                TimestampMillisecondType,
+                IntervalMonthDayNanoType,
+                add_m_d_nano,
+                NaiveDateTime::timestamp_millis
+            )
+        }
+        // Microsecond op MonthDayNano
+        (TimestampMicrosecond(Some(ts_us), tz), Interval(MonthDayNano)) => {
+            let naive_date = NaiveDateTime::from_timestamp_micros(*ts_us);
+            scalar_ts_op_interval_macro!(
+                naive_date,
+                tz,
+                array,
+                sign,
+                as_interval_mdn_array,
+                TimestampMicrosecondType,
+                IntervalMonthDayNanoType,
+                add_m_d_nano,
+                NaiveDateTime::timestamp_micros
+            )
+        }
+
+        // Nanosecond op MonthDayNano
+        (TimestampNanosecond(Some(ts_ns), tz), Interval(MonthDayNano)) => {
+            let naive_date = NaiveDateTime::from_timestamp_opt(
+                ts_ns.div_euclid(1_000_000_000),
+                ts_ns.rem_euclid(1_000_000_000).try_into().map_err(|_| {
+                    DataFusionError::Internal("Overflow of divison".to_string())
+                })?,
+            );
+            scalar_ts_op_interval_macro!(
+                naive_date,
+                tz,
+                array,
+                sign,
+                as_interval_mdn_array,
+                TimestampNanosecondType,
+                IntervalMonthDayNanoType,
+                add_m_d_nano,
+                NaiveDateTime::timestamp_nanos
+            )
+        }
+        _ => Err(DataFusionError::Internal(
+            "Invalid types for Timestamp vs Interval operations".to_string(),
+        ))?,
+    };
+    Ok(ret)
+}
+
+/// This function handles interval +/- interval operations where the former is
+/// an array and the latter is a scalar, resulting in an interval array.
+pub fn interval_op_scalar_interval(
+    array: &ArrayRef,
+    sign: i32,
+    scalar: &ScalarValue,
+) -> Result<ArrayRef> {
+    use DataType::*;
+    use IntervalUnit::*;
+    use ScalarValue::*;
+    let ret = match (array.data_type(), scalar) {
+        (Interval(YearMonth), IntervalYearMonth(Some(rhs))) => {
+            op_interval_macro!(
+                array,
+                as_interval_ym_array,
+                IntervalYearMonthType,
+                op_ym,
+                rhs,
+                sign
+            )
+        }
+        (Interval(YearMonth), IntervalDayTime(Some(rhs))) => {
+            op_interval_cross_macro!(
+                array,
+                as_interval_ym_array,
+                false,
+                op_ym_dt,
+                rhs,
+                sign,
+                i32,
+                i64
+            )
+        }
+        (Interval(YearMonth), IntervalMonthDayNano(Some(rhs))) => {
+            op_interval_cross_macro!(
+                array,
+                as_interval_ym_array,
+                false,
+                op_ym_mdn,
+                rhs,
+                sign,
+                i32,
+                i128
+            )
+        }
+        (Interval(DayTime), IntervalYearMonth(Some(rhs))) => {
+            op_interval_cross_macro!(
+                array,
+                as_interval_dt_array,
+                true,
+                op_ym_dt,
+                rhs,
+                sign,
+                i32,
+                i64
+            )
+        }
+        (Interval(DayTime), IntervalDayTime(Some(rhs))) => {
+            op_interval_macro!(
+                array,
+                as_interval_dt_array,
+                IntervalDayTimeType,
+                op_dt,
+                rhs,
+                sign
+            )
+        }
+        (Interval(DayTime), IntervalMonthDayNano(Some(rhs))) => {
+            op_interval_cross_macro!(
+                array,
+                as_interval_dt_array,
+                false,
+                op_dt_mdn,
+                rhs,
+                sign,
+                i64,
+                i128
+            )
+        }
+        (Interval(MonthDayNano), IntervalYearMonth(Some(rhs))) => {
+            op_interval_cross_macro!(
+                array,
+                as_interval_mdn_array,
+                true,
+                op_ym_mdn,
+                rhs,
+                sign,
+                i32,
+                i128
+            )
+        }
+        (Interval(MonthDayNano), IntervalDayTime(Some(rhs))) => {
+            op_interval_cross_macro!(
+                array,
+                as_interval_mdn_array,
+                true,
+                op_dt_mdn,
+                rhs,
+                sign,
+                i64,
+                i128
+            )
+        }
+        (Interval(MonthDayNano), IntervalMonthDayNano(Some(rhs))) => {
+            op_interval_macro!(
+                array,
+                as_interval_mdn_array,
+                IntervalMonthDayNanoType,
+                op_mdn,
+                rhs,
+                sign
+            )
+        }
+        _ => Err(DataFusionError::Internal(format!(
+            "Invalid operands for Interval vs Interval operations: {} - {}",
+            array.data_type(),
+            scalar.get_datatype(),
+        )))?,
+    };
+    Ok(ret)
+}
+
+/// This function handles interval +/- interval operations where the former is
+/// a scalar and the latter is an array, resulting in an interval array.
+pub fn scalar_interval_op_interval(
+    scalar: &ScalarValue,
+    sign: i32,
+    array: &ArrayRef,
+) -> Result<ArrayRef> {
+    use DataType::*;
+    use IntervalUnit::*;
+    use ScalarValue::*;
+    let ret = match (scalar, array.data_type()) {
+        // YearMonth op YearMonth
+        (IntervalYearMonth(Some(lhs)), Interval(YearMonth)) => {
+            let array = as_interval_ym_array(&array)?;
+            let ret: PrimitiveArray<IntervalYearMonthType> =
+                unary(array, |rhs| op_ym(*lhs, rhs, sign));
+            Arc::new(ret) as _
+        }
+        // DayTime op YearMonth
+        (IntervalDayTime(Some(lhs)), Interval(YearMonth)) => {
+            let array = as_interval_ym_array(&array)?;
+            let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+                unary(array, |rhs| op_ym_dt(rhs, *lhs, sign, true));
+            Arc::new(ret) as _
+        }
+        // MonthDayNano op YearMonth
+        (IntervalMonthDayNano(Some(lhs)), Interval(YearMonth)) => {
+            let array = as_interval_ym_array(&array)?;
+            let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+                unary(array, |rhs| op_ym_mdn(rhs, *lhs, sign, true));
+            Arc::new(ret) as _
+        }
+        // YearMonth op DayTime
+        (IntervalYearMonth(Some(lhs)), Interval(DayTime)) => {
+            let array = as_interval_dt_array(&array)?;
+            let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+                unary(array, |rhs| op_ym_dt(*lhs, rhs, sign, false));
+            Arc::new(ret) as _
+        }
+        // DayTime op DayTime
+        (IntervalDayTime(Some(lhs)), Interval(DayTime)) => {
+            let array = as_interval_dt_array(&array)?;
+            let ret: PrimitiveArray<IntervalDayTimeType> =
+                unary(array, |rhs| op_dt(*lhs, rhs, sign));
+            Arc::new(ret) as _
+        }
+        // MonthDayNano op DayTime
+        (IntervalMonthDayNano(Some(lhs)), Interval(DayTime)) => {
+            let array = as_interval_dt_array(&array)?;
+            let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+                unary(array, |rhs| op_dt_mdn(rhs, *lhs, sign, true));
+            Arc::new(ret) as _
+        }
+        // YearMonth op MonthDayNano
+        (IntervalYearMonth(Some(lhs)), Interval(MonthDayNano)) => {
+            let array = as_interval_mdn_array(&array)?;
+            let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+                unary(array, |rhs| op_ym_mdn(*lhs, rhs, sign, false));
+            Arc::new(ret) as _
+        }
+        // DayTime op MonthDayNano
+        (IntervalDayTime(Some(lhs)), Interval(MonthDayNano)) => {
+            let array = as_interval_mdn_array(&array)?;
+            let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+                unary(array, |rhs| op_dt_mdn(*lhs, rhs, sign, false));
+            Arc::new(ret) as _
+        }
+        // MonthDayNano op MonthDayNano
+        (IntervalMonthDayNano(Some(lhs)), Interval(MonthDayNano)) => {
+            let array = as_interval_mdn_array(&array)?;
+            let ret: PrimitiveArray<IntervalMonthDayNanoType> =
+                unary(array, |rhs| op_mdn(*lhs, rhs, sign));
+            Arc::new(ret) as _
+        }
+        _ => Err(DataFusionError::Internal(format!(
+            "Invalid operands for Interval vs Interval operations: {} - {}",
+            scalar.get_datatype(),
+            array.data_type(),
+        )))?,
+    };
+    Ok(ret)
+}
+
+/// Performs a timestamp subtraction operation on two arrays and returns the resulting array.
+pub fn ts_array_op(array_lhs: &ArrayRef, array_rhs: &ArrayRef) -> Result<ArrayRef> {
+    use DataType::*;
+    use TimeUnit::*;
+    match (array_lhs.data_type(), array_rhs.data_type()) {
+        (Timestamp(Second, opt_tz_lhs), Timestamp(Second, opt_tz_rhs)) => Ok(ts_sub_op!(
+            array_lhs,
+            array_rhs,
+            opt_tz_lhs,
+            opt_tz_rhs,
+            1000i64,
+            as_timestamp_second_array,
+            seconds_sub,
+            NaiveDateTime::timestamp,
+            MILLISECOND_MODE,
+            IntervalDayTimeType
+        )),
+        (Timestamp(Millisecond, opt_tz_lhs), Timestamp(Millisecond, opt_tz_rhs)) => {
+            Ok(ts_sub_op!(
+                array_lhs,
+                array_rhs,
+                opt_tz_lhs,
+                opt_tz_rhs,
+                1i64,
+                as_timestamp_millisecond_array,
+                milliseconds_sub,
+                NaiveDateTime::timestamp_millis,
+                MILLISECOND_MODE,
+                IntervalDayTimeType
+            ))
+        }
+        (Timestamp(Microsecond, opt_tz_lhs), Timestamp(Microsecond, opt_tz_rhs)) => {
+            Ok(ts_sub_op!(
+                array_lhs,
+                array_rhs,
+                opt_tz_lhs,
+                opt_tz_rhs,
+                1000i64,
+                as_timestamp_microsecond_array,
+                microseconds_sub,
+                NaiveDateTime::timestamp_micros,
+                NANOSECOND_MODE,
+                IntervalMonthDayNanoType
+            ))
+        }
+        (Timestamp(Nanosecond, opt_tz_lhs), Timestamp(Nanosecond, opt_tz_rhs)) => {
+            Ok(ts_sub_op!(
+                array_lhs,
+                array_rhs,
+                opt_tz_lhs,
+                opt_tz_rhs,
+                1i64,
+                as_timestamp_nanosecond_array,
+                nanoseconds_sub,
+                NaiveDateTime::timestamp_nanos,
+                NANOSECOND_MODE,
+                IntervalMonthDayNanoType
+            ))
+        }
+        (_, _) => Err(DataFusionError::Execution(format!(
+            "Invalid array types for Timestamp subtraction: {} - {}",
+            array_lhs.data_type(),
+            array_rhs.data_type()
+        ))),
+    }
+}
+/// Performs an interval operation on two arrays and returns the resulting array.
+/// The operation sign determines whether to perform addition or subtraction.
+/// The data type and unit of the two input arrays must match the supported combinations.
+pub fn interval_array_op(
+    array_lhs: &ArrayRef,
+    array_rhs: &ArrayRef,
+    sign: i32,
+) -> Result<ArrayRef> {
+    use DataType::*;
+    use IntervalUnit::*;
+    match (array_lhs.data_type(), array_rhs.data_type()) {
+        (Interval(YearMonth), Interval(YearMonth)) => Ok(interval_op!(
+            array_lhs,
+            array_rhs,
+            as_interval_ym_array,
+            op_ym,
+            sign,
+            IntervalYearMonthType
+        )),
+        (Interval(YearMonth), Interval(DayTime)) => Ok(interval_cross_op!(
+            array_lhs,
+            array_rhs,
+            as_interval_ym_array,
+            as_interval_dt_array,
+            op_ym_dt,
+            sign,
+            false,
+            IntervalYearMonthType,
+            IntervalDayTimeType
+        )),
+        (Interval(YearMonth), Interval(MonthDayNano)) => Ok(interval_cross_op!(
+            array_lhs,
+            array_rhs,
+            as_interval_ym_array,
+            as_interval_mdn_array,
+            op_ym_mdn,
+            sign,
+            false,
+            IntervalYearMonthType,
+            IntervalMonthDayNanoType
+        )),
+        (Interval(DayTime), Interval(YearMonth)) => Ok(interval_cross_op!(
+            array_rhs,
+            array_lhs,
+            as_interval_ym_array,
+            as_interval_dt_array,
+            op_ym_dt,
+            sign,
+            true,
+            IntervalYearMonthType,
+            IntervalDayTimeType
+        )),
+        (Interval(DayTime), Interval(DayTime)) => Ok(interval_op!(
+            array_lhs,
+            array_rhs,
+            as_interval_dt_array,
+            op_dt,
+            sign,
+            IntervalDayTimeType
+        )),
+        (Interval(DayTime), Interval(MonthDayNano)) => Ok(interval_cross_op!(
+            array_lhs,
+            array_rhs,
+            as_interval_dt_array,
+            as_interval_mdn_array,
+            op_dt_mdn,
+            sign,
+            false,
+            IntervalDayTimeType,
+            IntervalMonthDayNanoType
+        )),
+        (Interval(MonthDayNano), Interval(YearMonth)) => Ok(interval_cross_op!(
+            array_rhs,
+            array_lhs,
+            as_interval_ym_array,
+            as_interval_mdn_array,
+            op_ym_mdn,
+            sign,
+            true,
+            IntervalYearMonthType,
+            IntervalMonthDayNanoType
+        )),
+        (Interval(MonthDayNano), Interval(DayTime)) => Ok(interval_cross_op!(
+            array_rhs,
+            array_lhs,
+            as_interval_dt_array,
+            as_interval_mdn_array,
+            op_dt_mdn,
+            sign,
+            true,
+            IntervalDayTimeType,
+            IntervalMonthDayNanoType
+        )),
+        (Interval(MonthDayNano), Interval(MonthDayNano)) => Ok(interval_op!(
+            array_lhs,
+            array_rhs,
+            as_interval_mdn_array,
+            op_mdn,
+            sign,
+            IntervalMonthDayNanoType
+        )),
+        (_, _) => Err(DataFusionError::Execution(format!(
+            "Invalid array types for Interval operation: {} {} {}",
+            array_lhs.data_type(),
+            sign,
+            array_rhs.data_type()
+        ))),
+    }
+}
+
+/// Performs a timestamp/interval operation on two arrays and returns the resulting array.
+/// The operation sign determines whether to perform addition or subtraction.
+/// The data type and unit of the two input arrays must match the supported combinations.
+pub fn ts_interval_array_op(
+    array_lhs: &ArrayRef,
+    sign: i32,
+    array_rhs: &ArrayRef,
+) -> Result<ArrayRef> {
+    use DataType::*;
+    use IntervalUnit::*;
+    use TimeUnit::*;
+    match (array_lhs.data_type(), array_rhs.data_type()) {
+        (Timestamp(Second, tz), Interval(YearMonth)) => Ok(ts_interval_op!(
+            array_lhs,
+            array_rhs,
+            tz,
+            as_timestamp_second_array,
+            as_interval_ym_array,
+            seconds_add_array::<YM_MODE>,
+            sign,
+            TimestampSecondType,
+            IntervalYearMonthType
+        )),
+        (Timestamp(Second, tz), Interval(DayTime)) => Ok(ts_interval_op!(
+            array_lhs,
+            array_rhs,
+            tz,
+            as_timestamp_second_array,
+            as_interval_dt_array,
+            seconds_add_array::<DT_MODE>,
+            sign,
+            TimestampSecondType,
+            IntervalDayTimeType
+        )),
+        (Timestamp(Second, tz), Interval(MonthDayNano)) => Ok(ts_interval_op!(
+            array_lhs,
+            array_rhs,
+            tz,
+            as_timestamp_second_array,
+            as_interval_mdn_array,
+            seconds_add_array::<MDN_MODE>,
+            sign,
+            TimestampSecondType,
+            IntervalMonthDayNanoType
+        )),
+        (Timestamp(Millisecond, tz), Interval(YearMonth)) => Ok(ts_interval_op!(
+            array_lhs,
+            array_rhs,
+            tz,
+            as_timestamp_millisecond_array,
+            as_interval_ym_array,
+            milliseconds_add_array::<YM_MODE>,
+            sign,
+            TimestampMillisecondType,
+            IntervalYearMonthType
+        )),
+        (Timestamp(Millisecond, tz), Interval(DayTime)) => Ok(ts_interval_op!(
+            array_lhs,
+            array_rhs,
+            tz,
+            as_timestamp_millisecond_array,
+            as_interval_dt_array,
+            milliseconds_add_array::<DT_MODE>,
+            sign,
+            TimestampMillisecondType,
+            IntervalDayTimeType
+        )),
+        (Timestamp(Millisecond, tz), Interval(MonthDayNano)) => Ok(ts_interval_op!(
+            array_lhs,
+            array_rhs,
+            tz,
+            as_timestamp_millisecond_array,
+            as_interval_mdn_array,
+            milliseconds_add_array::<MDN_MODE>,
+            sign,
+            TimestampMillisecondType,
+            IntervalMonthDayNanoType
+        )),
+        (Timestamp(Microsecond, tz), Interval(YearMonth)) => Ok(ts_interval_op!(
+            array_lhs,
+            array_rhs,
+            tz,
+            as_timestamp_microsecond_array,
+            as_interval_ym_array,
+            microseconds_add_array::<YM_MODE>,
+            sign,
+            TimestampMicrosecondType,
+            IntervalYearMonthType
+        )),
+        (Timestamp(Microsecond, tz), Interval(DayTime)) => Ok(ts_interval_op!(
+            array_lhs,
+            array_rhs,
+            tz,
+            as_timestamp_microsecond_array,
+            as_interval_dt_array,
+            microseconds_add_array::<DT_MODE>,
+            sign,
+            TimestampMicrosecondType,
+            IntervalDayTimeType
+        )),
+        (Timestamp(Microsecond, tz), Interval(MonthDayNano)) => Ok(ts_interval_op!(
+            array_lhs,
+            array_rhs,
+            tz,
+            as_timestamp_microsecond_array,
+            as_interval_mdn_array,
+            microseconds_add_array::<MDN_MODE>,
+            sign,
+            TimestampMicrosecondType,
+            IntervalMonthDayNanoType
+        )),
+        (Timestamp(Nanosecond, tz), Interval(YearMonth)) => Ok(ts_interval_op!(
+            array_lhs,
+            array_rhs,
+            tz,
+            as_timestamp_nanosecond_array,
+            as_interval_ym_array,
+            nanoseconds_add_array::<YM_MODE>,
+            sign,
+            TimestampNanosecondType,
+            IntervalYearMonthType
+        )),
+        (Timestamp(Nanosecond, tz), Interval(DayTime)) => Ok(ts_interval_op!(
+            array_lhs,
+            array_rhs,
+            tz,
+            as_timestamp_nanosecond_array,
+            as_interval_dt_array,
+            nanoseconds_add_array::<DT_MODE>,
+            sign,
+            TimestampNanosecondType,
+            IntervalDayTimeType
+        )),
+        (Timestamp(Nanosecond, tz), Interval(MonthDayNano)) => Ok(ts_interval_op!(
+            array_lhs,
+            array_rhs,
+            tz,
+            as_timestamp_nanosecond_array,
+            as_interval_mdn_array,
+            nanoseconds_add_array::<MDN_MODE>,
+            sign,
+            TimestampNanosecondType,
+            IntervalMonthDayNanoType
+        )),
+        (_, _) => Err(DataFusionError::Execution(format!(
+            "Invalid array types for Timestamp Interval operation: {} {} {}",
+            array_lhs.data_type(),
+            sign,
+            array_rhs.data_type()
+        ))),
+    }
+}
+
+#[inline]
+pub fn date32_interval_ym_op(
+    right: &Arc<dyn Array>,
+    epoch: &NaiveDate,
+    prior: &NaiveDate,
+    month_op: fn(NaiveDate, Months) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+    let right: &PrimitiveArray<IntervalYearMonthType> = right.as_primitive();
+    let ret = Arc::new(try_unary::<IntervalYearMonthType, _, Date32Type>(
+        right,
+        |ym| {
+            let months = Months::new(ym.try_into().map_err(|_| {
+                DataFusionError::Internal(
+                    "Interval values cannot be casted as unsigned integers".to_string(),
+                )
+            })?);
+            let value = month_op(*prior, months).ok_or_else(|| {
+                DataFusionError::Internal("Resulting date is out of range".to_string())
+            })?;
+            Ok((value - *epoch).num_days() as i32)
+        },
+    )?) as _;
+    Ok(ret)
+}
+
+#[inline]
+pub fn date32_interval_dt_op(
+    right: &Arc<dyn Array>,
+    epoch: &NaiveDate,
+    prior: &NaiveDate,
+    day_op: fn(NaiveDate, Days) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+    let right: &PrimitiveArray<IntervalDayTimeType> = right.as_primitive();
+    let ret = Arc::new(try_unary::<IntervalDayTimeType, _, Date32Type>(
+        right,
+        |dt| {
+            let (days, millis) = IntervalDayTimeType::to_parts(dt);
+            let days = Days::new(days.try_into().map_err(|_| {
+                DataFusionError::Internal(
+                    "Interval values cannot be casted as unsigned integers".to_string(),
+                )
+            })?);
+            let value = day_op(*prior, days).ok_or_else(|| {
+                DataFusionError::Internal("Resulting date is out of range".to_string())
+            })?;
+            let milli_days = millis as i64 / MILLISECONDS_IN_DAY;
+            Ok(((value - *epoch).num_days() - milli_days) as i32)
+        },
+    )?) as _;
+    Ok(ret)
+}
+
+#[inline]
+pub fn date32_interval_mdn_op(
+    right: &Arc<dyn Array>,
+    epoch: &NaiveDate,
+    prior: &NaiveDate,
+    day_op: fn(NaiveDate, Days) -> Option<NaiveDate>,
+    month_op: fn(NaiveDate, Months) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+    let cast_err = |_| {
+        DataFusionError::Internal(
+            "Interval values cannot be casted as unsigned integers".to_string(),
+        )
+    };
+    let out_of_range =
+        || DataFusionError::Internal("Resulting date is out of range".to_string());
+    let right: &PrimitiveArray<IntervalMonthDayNanoType> = right.as_primitive();
+    let ret = Arc::new(try_unary::<IntervalMonthDayNanoType, _, Date32Type>(
+        right,
+        |mdn| {
+            let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(mdn);
+            let months_obj = Months::new(months.try_into().map_err(cast_err)?);
+            let month_diff = month_op(*prior, months_obj).ok_or_else(out_of_range)?;
+            let days_obj = Days::new(days.try_into().map_err(cast_err)?);
+            let value = day_op(month_diff, days_obj).ok_or_else(out_of_range)?;
+            let nano_days = nanos / NANOSECONDS_IN_DAY;
+            Ok(((value - *epoch).num_days() - nano_days) as i32)
+        },
+    )?) as _;
+    Ok(ret)
+}
+
+#[inline]
+pub fn date64_interval_ym_op(
+    right: &Arc<dyn Array>,
+    epoch: &NaiveDate,
+    prior: &NaiveDate,
+    month_op: fn(NaiveDate, Months) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+    let right: &PrimitiveArray<IntervalYearMonthType> = right.as_primitive();
+    let ret = Arc::new(try_unary::<IntervalYearMonthType, _, Date64Type>(
+        right,
+        |ym| {
+            let months_obj = Months::new(ym.try_into().map_err(|_| {
+                DataFusionError::Internal(
+                    "Interval values cannot be casted as unsigned integers".to_string(),
+                )
+            })?);
+            let date = month_op(*prior, months_obj).ok_or_else(|| {
+                DataFusionError::Internal("Resulting date is out of range".to_string())
+            })?;
+            Ok((date - *epoch).num_milliseconds())
+        },
+    )?) as _;
+    Ok(ret)
+}
+
+#[inline]
+pub fn date64_interval_dt_op(
+    right: &Arc<dyn Array>,
+    epoch: &NaiveDate,
+    prior: &NaiveDate,
+    day_op: fn(NaiveDate, Days) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+    let right: &PrimitiveArray<IntervalDayTimeType> = right.as_primitive();
+    let ret = Arc::new(try_unary::<IntervalDayTimeType, _, Date64Type>(
+        right,
+        |dt| {
+            let (days, millis) = IntervalDayTimeType::to_parts(dt);
+            let days_obj = Days::new(days.try_into().map_err(|_| {
+                DataFusionError::Internal(
+                    "Interval values cannot be casted as unsigned integers".to_string(),
+                )
+            })?);
+            let date = day_op(*prior, days_obj).ok_or_else(|| {
+                DataFusionError::Internal("Resulting date is out of range".to_string())
+            })?;
+            Ok((date - *epoch).num_milliseconds() - millis as i64)
+        },
+    )?) as _;
+    Ok(ret)
+}
+
+#[inline]
+pub fn date64_interval_mdn_op(
+    right: &Arc<dyn Array>,
+    epoch: &NaiveDate,
+    prior: &NaiveDate,
+    day_op: fn(NaiveDate, Days) -> Option<NaiveDate>,
+    month_op: fn(NaiveDate, Months) -> Option<NaiveDate>,
+) -> Result<ArrayRef> {
+    let cast_err = |_| {
+        DataFusionError::Internal(
+            "Interval values cannot be casted as unsigned integers".to_string(),
+        )
+    };
+    let out_of_range =
+        || DataFusionError::Internal("Resulting date is out of range".to_string());
+    let right: &PrimitiveArray<IntervalMonthDayNanoType> = right.as_primitive();
+    let ret = Arc::new(try_unary::<IntervalMonthDayNanoType, _, Date64Type>(
+        right,
+        |mdn| {
+            let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(mdn);
+            let months_obj = Months::new(months.try_into().map_err(cast_err)?);
+            let month_diff = month_op(*prior, months_obj).ok_or_else(out_of_range)?;
+            let days_obj = Days::new(days.try_into().map_err(cast_err)?);
+            let value = day_op(month_diff, days_obj).ok_or_else(out_of_range)?;
+            Ok((value - *epoch).num_milliseconds() - nanos / 1_000_000)
+        },
+    )?) as _;
+    Ok(ret)
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs
index 976ef845aa..f1933c1d18 100644
--- a/datafusion/physical-expr/src/expressions/datetime.rs
+++ b/datafusion/physical-expr/src/expressions/datetime.rs
@@ -117,16 +117,17 @@ impl PhysicalExpr for DateTimeIntervalExpr {
             // This function evaluates temporal array vs scalar operations, such as timestamp - timestamp,
             // interval + interval, timestamp + interval, and interval + timestamp. It takes one array and one scalar as input
             // and an integer sign representing the operation (+1 for addition and -1 for subtraction).
-            (ColumnarValue::Array(array_lhs), ColumnarValue::Scalar(array_rhs)) => {
-                resolve_temporal_op_scalar(&array_lhs, sign, &array_rhs)
+            (ColumnarValue::Array(arr), ColumnarValue::Scalar(scalar)) => {
+                Ok(ColumnarValue::Array(resolve_temporal_op_scalar(
+                    &arr, sign, &scalar, false,
+                )?))
             }
             // This function evaluates operations between a scalar value and an array of temporal
             // values. One example is calculating the duration between a scalar timestamp and an
             // array of timestamps (i.e. `now() - some_column`).
-            (ColumnarValue::Scalar(scalar_lhs), ColumnarValue::Array(array_rhs)) => {
-                let array_lhs = scalar_lhs.to_array_of_size(array_rhs.len());
-                Ok(ColumnarValue::Array(resolve_temporal_op(
-                    &array_lhs, sign, &array_rhs,
+            (ColumnarValue::Scalar(scalar), ColumnarValue::Array(arr)) => {
+                Ok(ColumnarValue::Array(resolve_temporal_op_scalar(
+                    &arr, sign, &scalar, true,
                 )?))
             }
             // This function evaluates temporal array operations, such as timestamp - timestamp, interval + interval,
@@ -596,59 +597,77 @@ mod tests {
         let interval_array = interval_scalar.to_array();
 
         // timestamp + interval
-        if let ColumnarValue::Array(res1) =
-            resolve_temporal_op_scalar(&timestamp_array, 1, &interval_scalar)?
-        {
-            let res2 = timestamp_scalar.add(&interval_scalar)?.to_array();
-            assert_eq!(
-                &res1, &res2,
-                "Timestamp Scalar={timestamp_scalar} + Interval Scalar={interval_scalar}"
-            );
-        }
+        let res1 =
+            resolve_temporal_op_scalar(&timestamp_array, 1, &interval_scalar, false)?;
+        let res2 = timestamp_scalar.add(&interval_scalar)?.to_array();
+        assert_eq!(
+            &res1, &res2,
+            "Timestamp Scalar={timestamp_scalar} + Interval Scalar={interval_scalar}"
+        );
+        let res1 =
+            resolve_temporal_op_scalar(&timestamp_array, 1, &interval_scalar, true)?;
+        let res2 = interval_scalar.add(&timestamp_scalar)?.to_array();
+        assert_eq!(
+            &res1, &res2,
+            "Timestamp Scalar={timestamp_scalar} + Interval Scalar={interval_scalar}"
+        );
 
         // timestamp - interval
-        if let ColumnarValue::Array(res1) =
-            resolve_temporal_op_scalar(&timestamp_array, -1, &interval_scalar)?
-        {
-            let res2 = timestamp_scalar.sub(&interval_scalar)?.to_array();
-            assert_eq!(
-                &res1, &res2,
-                "Timestamp Scalar={timestamp_scalar} - Interval Scalar={interval_scalar}"
-            );
-        }
+        let res1 =
+            resolve_temporal_op_scalar(&timestamp_array, -1, &interval_scalar, false)?;
+        let res2 = timestamp_scalar.sub(&interval_scalar)?.to_array();
+        assert_eq!(
+            &res1, &res2,
+            "Timestamp Scalar={timestamp_scalar} - Interval Scalar={interval_scalar}"
+        );
 
         // timestamp - timestamp
-        if let ColumnarValue::Array(res1) =
-            resolve_temporal_op_scalar(&timestamp_array, -1, &timestamp_scalar)?
-        {
-            let res2 = timestamp_scalar.sub(&timestamp_scalar)?.to_array();
-            assert_eq!(
-                &res1, &res2,
-                "Timestamp Scalar={timestamp_scalar} - Timestamp Scalar={timestamp_scalar}"
-            );
-        }
+        let res1 =
+            resolve_temporal_op_scalar(&timestamp_array, -1, &timestamp_scalar, false)?;
+        let res2 = timestamp_scalar.sub(&timestamp_scalar)?.to_array();
+        assert_eq!(
+            &res1, &res2,
+            "Timestamp Scalar={timestamp_scalar} - Timestamp Scalar={timestamp_scalar}"
+        );
+        let res1 =
+            resolve_temporal_op_scalar(&timestamp_array, -1, &timestamp_scalar, true)?;
+        let res2 = timestamp_scalar.sub(&timestamp_scalar)?.to_array();
+        assert_eq!(
+            &res1, &res2,
+            "Timestamp Scalar={timestamp_scalar} - Timestamp Scalar={timestamp_scalar}"
+        );
 
         // interval - interval
-        if let ColumnarValue::Array(res1) =
-            resolve_temporal_op_scalar(&interval_array, -1, &interval_scalar)?
-        {
-            let res2 = interval_scalar.sub(&interval_scalar)?.to_array();
-            assert_eq!(
-                &res1, &res2,
-                "Interval Scalar={interval_scalar} - Interval Scalar={interval_scalar}"
-            );
-        }
+        let res1 =
+            resolve_temporal_op_scalar(&interval_array, -1, &interval_scalar, false)?;
+        let res2 = interval_scalar.sub(&interval_scalar)?.to_array();
+        assert_eq!(
+            &res1, &res2,
+            "Interval Scalar={interval_scalar} - Interval Scalar={interval_scalar}"
+        );
+        let res1 =
+            resolve_temporal_op_scalar(&interval_array, -1, &interval_scalar, true)?;
+        let res2 = interval_scalar.sub(&interval_scalar)?.to_array();
+        assert_eq!(
+            &res1, &res2,
+            "Interval Scalar={interval_scalar} - Interval Scalar={interval_scalar}"
+        );
 
         // interval + interval
-        if let ColumnarValue::Array(res1) =
-            resolve_temporal_op_scalar(&interval_array, 1, &interval_scalar)?
-        {
-            let res2 = interval_scalar.add(&interval_scalar)?.to_array();
-            assert_eq!(
-                &res1, &res2,
-                "Interval Scalar={interval_scalar} + Interval Scalar={interval_scalar}"
-            );
-        }
+        let res1 =
+            resolve_temporal_op_scalar(&interval_array, 1, &interval_scalar, false)?;
+        let res2 = interval_scalar.add(&interval_scalar)?.to_array();
+        assert_eq!(
+            &res1, &res2,
+            "Interval Scalar={interval_scalar} + Interval Scalar={interval_scalar}"
+        );
+        let res1 =
+            resolve_temporal_op_scalar(&interval_array, 1, &interval_scalar, true)?;
+        let res2 = interval_scalar.add(&interval_scalar)?.to_array();
+        assert_eq!(
+            &res1, &res2,
+            "Interval Scalar={interval_scalar} + Interval Scalar={interval_scalar}"
+        );
 
         Ok(())
     }
@@ -732,6 +751,175 @@ mod tests {
 
         experiment(timestamp_scalar, interval_scalar)?;
 
+        // More test with all matchings of timestamps and intervals
+        let timestamp_scalar = ScalarValue::TimestampSecond(
+            Some(
+                NaiveDate::from_ymd_opt(2000, 12, 31)
+                    .unwrap()
+                    .and_hms_opt(23, 59, 59)
+                    .unwrap()
+                    .timestamp(),
+            ),
+            None,
+        );
+        let interval_scalar = ScalarValue::new_interval_ym(0, 1);
+
+        experiment(timestamp_scalar, interval_scalar)?;
+
+        let timestamp_scalar = ScalarValue::TimestampSecond(
+            Some(
+                NaiveDate::from_ymd_opt(2000, 12, 31)
+                    .unwrap()
+                    .and_hms_opt(23, 59, 59)
+                    .unwrap()
+                    .timestamp(),
+            ),
+            None,
+        );
+        let interval_scalar = ScalarValue::new_interval_dt(10, 100000);
+
+        experiment(timestamp_scalar, interval_scalar)?;
+
+        let timestamp_scalar = ScalarValue::TimestampSecond(
+            Some(
+                NaiveDate::from_ymd_opt(2000, 12, 31)
+                    .unwrap()
+                    .and_hms_opt(23, 59, 59)
+                    .unwrap()
+                    .timestamp(),
+            ),
+            None,
+        );
+        let interval_scalar = ScalarValue::new_interval_mdn(13, 32, 123456);
+
+        experiment(timestamp_scalar, interval_scalar)?;
+
+        let timestamp_scalar = ScalarValue::TimestampMillisecond(
+            Some(
+                NaiveDate::from_ymd_opt(2000, 12, 31)
+                    .unwrap()
+                    .and_hms_milli_opt(23, 59, 59, 909)
+                    .unwrap()
+                    .timestamp_millis(),
+            ),
+            None,
+        );
+        let interval_scalar = ScalarValue::new_interval_ym(0, 1);
+
+        experiment(timestamp_scalar, interval_scalar)?;
+
+        let timestamp_scalar = ScalarValue::TimestampMillisecond(
+            Some(
+                NaiveDate::from_ymd_opt(2000, 12, 31)
+                    .unwrap()
+                    .and_hms_milli_opt(23, 59, 59, 909)
+                    .unwrap()
+                    .timestamp_millis(),
+            ),
+            None,
+        );
+        let interval_scalar = ScalarValue::new_interval_dt(10, 100000);
+
+        experiment(timestamp_scalar, interval_scalar)?;
+
+        let timestamp_scalar = ScalarValue::TimestampMillisecond(
+            Some(
+                NaiveDate::from_ymd_opt(2000, 12, 31)
+                    .unwrap()
+                    .and_hms_milli_opt(23, 59, 59, 909)
+                    .unwrap()
+                    .timestamp_millis(),
+            ),
+            None,
+        );
+        let interval_scalar = ScalarValue::new_interval_mdn(13, 32, 123456);
+
+        experiment(timestamp_scalar, interval_scalar)?;
+
+        let timestamp_scalar = ScalarValue::TimestampMicrosecond(
+            Some(
+                NaiveDate::from_ymd_opt(2000, 12, 31)
+                    .unwrap()
+                    .and_hms_micro_opt(23, 59, 59, 987654)
+                    .unwrap()
+                    .timestamp_micros(),
+            ),
+            None,
+        );
+        let interval_scalar = ScalarValue::new_interval_ym(0, 1);
+
+        experiment(timestamp_scalar, interval_scalar)?;
+
+        let timestamp_scalar = ScalarValue::TimestampMicrosecond(
+            Some(
+                NaiveDate::from_ymd_opt(2000, 12, 31)
+                    .unwrap()
+                    .and_hms_micro_opt(23, 59, 59, 987654)
+                    .unwrap()
+                    .timestamp_micros(),
+            ),
+            None,
+        );
+        let interval_scalar = ScalarValue::new_interval_dt(10, 100000);
+
+        experiment(timestamp_scalar, interval_scalar)?;
+
+        let timestamp_scalar = ScalarValue::TimestampMicrosecond(
+            Some(
+                NaiveDate::from_ymd_opt(2000, 12, 31)
+                    .unwrap()
+                    .and_hms_micro_opt(23, 59, 59, 987654)
+                    .unwrap()
+                    .timestamp_micros(),
+            ),
+            None,
+        );
+        let interval_scalar = ScalarValue::new_interval_mdn(13, 32, 123456);
+
+        experiment(timestamp_scalar, interval_scalar)?;
+
+        let timestamp_scalar = ScalarValue::TimestampNanosecond(
+            Some(
+                NaiveDate::from_ymd_opt(2000, 12, 31)
+                    .unwrap()
+                    .and_hms_nano_opt(23, 59, 59, 999999999)
+                    .unwrap()
+                    .timestamp_nanos(),
+            ),
+            None,
+        );
+        let interval_scalar = ScalarValue::new_interval_ym(0, 1);
+
+        experiment(timestamp_scalar, interval_scalar)?;
+
+        let timestamp_scalar = ScalarValue::TimestampNanosecond(
+            Some(
+                NaiveDate::from_ymd_opt(2000, 12, 31)
+                    .unwrap()
+                    .and_hms_nano_opt(23, 59, 59, 999999999)
+                    .unwrap()
+                    .timestamp_nanos(),
+            ),
+            None,
+        );
+        let interval_scalar = ScalarValue::new_interval_dt(10, 100000);
+
+        experiment(timestamp_scalar, interval_scalar)?;
+
+        let timestamp_scalar = ScalarValue::TimestampNanosecond(
+            Some(
+                NaiveDate::from_ymd_opt(2000, 12, 31)
+                    .unwrap()
+                    .and_hms_nano_opt(23, 59, 59, 999999999)
+                    .unwrap()
+                    .timestamp_nanos(),
+            ),
+            None,
+        );
+        let interval_scalar = ScalarValue::new_interval_mdn(13, 32, 123456);
+
+        experiment(timestamp_scalar, interval_scalar)?;
+
         Ok(())
     }
 }