You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/08/15 18:06:54 UTC

[arrow-datafusion] branch master updated: Feature/support timestamp plus minus interval (#3110)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6509d0d70 Feature/support timestamp plus minus interval (#3110)
6509d0d70 is described below

commit 6509d0d70421ddc7f934c24348000655e86c6b76
Author: JasonLi <li...@126.com>
AuthorDate: Tue Aug 16 02:06:49 2022 +0800

    Feature/support timestamp plus minus interval (#3110)
    
    * support Timestamp +/- Interval
    
    * restore example parquet_sql
    
    * support date and timestamp array +/- interval scalar
    
    Co-authored-by: jasonnnli <ja...@tencent.com>
---
 datafusion/core/tests/sql/timestamp.rs             | 139 +++++++
 datafusion/expr/src/binary_rule.rs                 |   4 +-
 .../physical-expr/src/expressions/datetime.rs      | 440 ++++++++++++++++++---
 datafusion/physical-expr/src/planner.rs            |   2 +-
 4 files changed, 518 insertions(+), 67 deletions(-)

diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs
index 257fa4658..ba916a658 100644
--- a/datafusion/core/tests/sql/timestamp.rs
+++ b/datafusion/core/tests/sql/timestamp.rs
@@ -17,6 +17,7 @@
 
 use super::*;
 use datafusion::from_slice::FromSlice;
+use std::ops::Add;
 
 #[tokio::test]
 async fn query_cast_timestamp_millis() -> Result<()> {
@@ -1256,3 +1257,141 @@ async fn date_bin() {
         "Arrow error: External error: This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
     );
 }
+
+#[tokio::test]
+async fn timestamp_add_interval_second() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "SELECT NOW(), NOW() + INTERVAL '1' SECOND;";
+    let results = execute_to_batches(&ctx, sql).await;
+    let actual = result_vec(&results);
+
+    let res1 = actual[0][0].as_str();
+    let res2 = actual[0][1].as_str();
+
+    let format = "%Y-%m-%d %H:%M:%S%.6f";
+    let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap();
+    let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap();
+
+    assert_eq!(t1_naive.add(Duration::seconds(1)), t2_naive);
+    Ok(())
+}
+
+#[tokio::test]
+async fn timestamp_sub_interval_days() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "SELECT NOW(), NOW() - INTERVAL '8' DAY;";
+    let results = execute_to_batches(&ctx, sql).await;
+    let actual = result_vec(&results);
+
+    let res1 = actual[0][0].as_str();
+    let res2 = actual[0][1].as_str();
+
+    let format = "%Y-%m-%d %H:%M:%S%.6f";
+    let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap();
+    let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap();
+
+    assert_eq!(t1_naive.sub(Duration::days(8)), t2_naive);
+    Ok(())
+}
+
+#[tokio::test]
+async fn timestamp_add_interval_months() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "SELECT NOW(), NOW() + INTERVAL '4' MONTH;";
+    let results = execute_to_batches(&ctx, sql).await;
+    let actual = result_vec(&results);
+
+    let res1 = actual[0][0].as_str();
+    let res2 = actual[0][1].as_str();
+
+    let format = "%Y-%m-%d %H:%M:%S%.6f";
+    let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap();
+    let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap();
+
+    assert_eq!(t1_naive.with_month(t1_naive.month() + 4).unwrap(), t2_naive);
+    Ok(())
+}
+
+#[tokio::test]
+async fn timestamp_sub_interval_years() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "SELECT NOW(), NOW() - INTERVAL '16' YEAR;";
+    let results = execute_to_batches(&ctx, sql).await;
+    let actual = result_vec(&results);
+
+    let res1 = actual[0][0].as_str();
+    let res2 = actual[0][1].as_str();
+
+    let format = "%Y-%m-%d %H:%M:%S%.6f";
+    let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap();
+    let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap();
+
+    assert_eq!(t1_naive.with_year(t1_naive.year() - 16).unwrap(), t2_naive);
+    Ok(())
+}
+
+#[tokio::test]
+async fn timestamp_array_add_interval() -> Result<()> {
+    let ctx = SessionContext::new();
+    let table_a = make_timestamp_table::<TimestampNanosecondType>()?;
+    let table_b = make_timestamp_table::<TimestampMicrosecondType>()?;
+    ctx.register_table("table_a", table_a)?;
+    ctx.register_table("table_b", table_b)?;
+
+    let sql = "SELECT ts, ts - INTERVAL '8' MILLISECONDS FROM table_a";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+---------------------------------------+",
+        "| ts                         | table_a.ts Minus IntervalDayTime(\"8\") |",
+        "+----------------------------+---------------------------------------+",
+        "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.182855            |",
+        "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.182855            |",
+        "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.182855            |",
+        "+----------------------------+---------------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+
+    let sql = "SELECT ts, ts + INTERVAL '1' SECOND FROM table_b";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-----------------------------------------+",
+        "| ts                         | table_b.ts Plus IntervalDayTime(\"1000\") |",
+        "+----------------------------+-----------------------------------------+",
+        "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:30.190855              |",
+        "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:30.190855              |",
+        "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:30.190855              |",
+        "+----------------------------+-----------------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+
+    let sql = "SELECT ts, ts + INTERVAL '2' MONTH FROM table_b";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+----------------------------------------+",
+        "| ts                         | table_b.ts Plus IntervalYearMonth(\"2\") |",
+        "+----------------------------+----------------------------------------+",
+        "| 2020-09-08 13:42:29.190855 | 2020-11-08 13:42:29.190855             |",
+        "| 2020-09-08 12:42:29.190855 | 2020-11-08 12:42:29.190855             |",
+        "| 2020-09-08 11:42:29.190855 | 2020-11-08 11:42:29.190855             |",
+        "+----------------------------+----------------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+
+    let sql = "SELECT ts, ts - INTERVAL '16' YEAR FROM table_b";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-------------------------------------------+",
+        "| ts                         | table_b.ts Minus IntervalYearMonth(\"192\") |",
+        "+----------------------------+-------------------------------------------+",
+        "| 2020-09-08 13:42:29.190855 | 2004-09-08 13:42:29.190855                |",
+        "| 2020-09-08 12:42:29.190855 | 2004-09-08 12:42:29.190855                |",
+        "| 2020-09-08 11:42:29.190855 | 2004-09-08 11:42:29.190855                |",
+        "+----------------------------+-------------------------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
diff --git a/datafusion/expr/src/binary_rule.rs b/datafusion/expr/src/binary_rule.rs
index f71e97e49..d0057a1c9 100644
--- a/datafusion/expr/src/binary_rule.rs
+++ b/datafusion/expr/src/binary_rule.rs
@@ -99,7 +99,9 @@ pub fn coerce_types(
         Operator::Like | Operator::NotLike => like_coercion(lhs_type, rhs_type),
         // date +/- interval returns date
         Operator::Plus | Operator::Minus
-            if (*lhs_type == DataType::Date32 || *lhs_type == DataType::Date64) =>
+            if (*lhs_type == DataType::Date32
+                || *lhs_type == DataType::Date64
+                || matches!(lhs_type, DataType::Timestamp(_, _))) =>
         {
             match rhs_type {
                 DataType::Interval(_) => Some(lhs_type.clone()),
diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs
index 3c59a90b0..bb76862f4 100644
--- a/datafusion/physical-expr/src/expressions/datetime.rs
+++ b/datafusion/physical-expr/src/expressions/datetime.rs
@@ -17,9 +17,17 @@
 
 use crate::expressions::delta::shift_months;
 use crate::PhysicalExpr;
-use arrow::datatypes::{DataType, Schema};
+use arrow::array::{
+    Array, ArrayRef, Date32Array, Date64Array, TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+};
+use arrow::compute::unary;
+use arrow::datatypes::{
+    DataType, Date32Type, Date64Type, Schema, TimeUnit, TimestampMicrosecondType,
+    TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
+};
 use arrow::record_batch::RecordBatch;
-use chrono::{Duration, NaiveDate};
+use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime};
 use datafusion_common::Result;
 use datafusion_common::{DataFusionError, ScalarValue};
 use datafusion_expr::{ColumnarValue, Operator};
@@ -45,19 +53,21 @@ impl DateIntervalExpr {
         input_schema: &Schema,
     ) -> Result<Self> {
         match lhs.data_type(input_schema)? {
-            DataType::Date32 | DataType::Date64 => match rhs.data_type(input_schema)? {
-                DataType::Interval(_) => match &op {
-                    Operator::Plus | Operator::Minus => Ok(Self { lhs, op, rhs }),
-                    _ => Err(DataFusionError::Execution(format!(
-                        "Invalid operator '{}' for DateIntervalExpr",
-                        op
+            DataType::Date32 | DataType::Date64 | DataType::Timestamp(_, _) => {
+                match rhs.data_type(input_schema)? {
+                    DataType::Interval(_) => match &op {
+                        Operator::Plus | Operator::Minus => Ok(Self { lhs, op, rhs }),
+                        _ => Err(DataFusionError::Execution(format!(
+                            "Invalid operator '{}' for DateIntervalExpr",
+                            op
+                        ))),
+                    },
+                    other => Err(DataFusionError::Execution(format!(
+                        "Operation '{}' not support for type {}",
+                        op, other
                     ))),
-                },
-                other => Err(DataFusionError::Execution(format!(
-                    "Invalid rhs type '{}' for DateIntervalExpr",
-                    other
-                ))),
-            },
+                }
+            }
             other => Err(DataFusionError::Execution(format!(
                 "Invalid lhs type '{}' for DateIntervalExpr",
                 other
@@ -89,28 +99,8 @@ impl PhysicalExpr for DateIntervalExpr {
         let dates = self.lhs.evaluate(batch)?;
         let intervals = self.rhs.evaluate(batch)?;
 
-        // Unwrap days since epoch
-        let operand = match dates {
-            ColumnarValue::Scalar(scalar) => scalar,
-            _ => Err(DataFusionError::Execution(
-                "Columnar execution is not yet supported for DateIntervalExpr"
-                    .to_string(),
-            ))?,
-        };
-
-        // Convert to NaiveDate
-        let epoch = NaiveDate::from_ymd(1970, 1, 1);
-        let prior = match operand {
-            ScalarValue::Date32(Some(d)) => epoch.add(Duration::days(d as i64)),
-            ScalarValue::Date64(Some(ms)) => epoch.add(Duration::milliseconds(ms)),
-            _ => Err(DataFusionError::Execution(format!(
-                "Invalid lhs type for DateIntervalExpr: {:?}",
-                operand
-            )))?,
-        };
-
         // Unwrap interval to add
-        let scalar = match &intervals {
+        let intervals = match &intervals {
             ColumnarValue::Scalar(interval) => interval,
             _ => Err(DataFusionError::Execution(
                 "Columnar execution is not yet supported for DateIntervalExpr"
@@ -130,38 +120,196 @@ impl PhysicalExpr for DateIntervalExpr {
             }
         };
 
-        // Do math
-        let posterior = match scalar {
-            ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
-            ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign),
-            ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
-            other => Err(DataFusionError::Execution(format!(
-                "DateIntervalExpr does not support non-interval type {:?}",
-                other
-            )))?,
-        };
-
-        // convert back
-        let res = match operand {
-            ScalarValue::Date32(Some(_)) => {
-                let days = posterior.sub(epoch).num_days() as i32;
-                ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
-            }
-            ScalarValue::Date64(Some(_)) => {
-                let ms = posterior.sub(epoch).num_milliseconds();
-                ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
-            }
-            _ => Err(DataFusionError::Execution(format!(
-                "Invalid lhs type for DateIntervalExpr: {}",
-                scalar
-            )))?,
-        };
-        Ok(res)
+        match dates {
+            ColumnarValue::Scalar(operand) => evaluate_scalar(operand, sign, intervals),
+            ColumnarValue::Array(array) => evaluate_array(array, sign, intervals),
+        }
     }
 }
 
+pub fn evaluate_array(
+    array: ArrayRef,
+    sign: i32,
+    scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+    let ret = match array.data_type() {
+        DataType::Date32 => {
+            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
+            Arc::new(unary::<Date32Type, _, Date32Type>(array, |days| {
+                date32_add(days, scalar, sign).unwrap()
+            })) as ArrayRef
+        }
+        DataType::Date64 => {
+            let array = array.as_any().downcast_ref::<Date64Array>().unwrap();
+            Arc::new(unary::<Date64Type, _, Date64Type>(array, |ms| {
+                date64_add(ms, scalar, sign).unwrap()
+            })) as ArrayRef
+        }
+        DataType::Timestamp(TimeUnit::Second, _) => {
+            let array = array
+                .as_any()
+                .downcast_ref::<TimestampSecondArray>()
+                .unwrap();
+            Arc::new(unary::<TimestampSecondType, _, TimestampSecondType>(
+                array,
+                |ts_s| seconds_add(ts_s, scalar, sign).unwrap(),
+            )) as ArrayRef
+        }
+        DataType::Timestamp(TimeUnit::Millisecond, _) => {
+            let array = array
+                .as_any()
+                .downcast_ref::<TimestampMillisecondArray>()
+                .unwrap();
+            Arc::new(
+                unary::<TimestampMillisecondType, _, TimestampMillisecondType>(
+                    array,
+                    |ts_ms| milliseconds_add(ts_ms, scalar, sign).unwrap(),
+                ),
+            ) as ArrayRef
+        }
+        DataType::Timestamp(TimeUnit::Microsecond, _) => {
+            let array = array
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap();
+            Arc::new(
+                unary::<TimestampMicrosecondType, _, TimestampMicrosecondType>(
+                    array,
+                    |ts_us| microseconds_add(ts_us, scalar, sign).unwrap(),
+                ),
+            ) as ArrayRef
+        }
+        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+            let array = array
+                .as_any()
+                .downcast_ref::<TimestampNanosecondArray>()
+                .unwrap();
+            Arc::new(
+                unary::<TimestampNanosecondType, _, TimestampNanosecondType>(
+                    array,
+                    |ts_ns| nanoseconds_add(ts_ns, scalar, sign).unwrap(),
+                ),
+            ) as ArrayRef
+        }
+        _ => Err(DataFusionError::Execution(format!(
+            "Invalid lhs type for DateIntervalExpr: {}",
+            array.data_type()
+        )))?,
+    };
+    Ok(ColumnarValue::Array(ret))
+}
+
+fn evaluate_scalar(
+    operand: ScalarValue,
+    sign: i32,
+    scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+    let res = match operand {
+        ScalarValue::Date32(Some(days)) => {
+            let value = date32_add(days, scalar, sign)?;
+            ColumnarValue::Scalar(ScalarValue::Date32(Some(value)))
+        }
+        ScalarValue::Date64(Some(ms)) => {
+            let value = date64_add(ms, scalar, sign)?;
+            ColumnarValue::Scalar(ScalarValue::Date64(Some(value)))
+        }
+        ScalarValue::TimestampSecond(Some(ts_s), zone) => {
+            let value = seconds_add(ts_s, scalar, sign)?;
+            ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(value), zone))
+        }
+        ScalarValue::TimestampMillisecond(Some(ts_ms), zone) => {
+            let value = milliseconds_add(ts_ms, scalar, sign)?;
+            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(value), zone))
+        }
+        ScalarValue::TimestampMicrosecond(Some(ts_us), zone) => {
+            let value = microseconds_add(ts_us, scalar, sign)?;
+            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(value), zone))
+        }
+        ScalarValue::TimestampNanosecond(Some(ts_ns), zone) => {
+            let value = nanoseconds_add(ts_ns, scalar, sign)?;
+            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), zone))
+        }
+        _ => Err(DataFusionError::Execution(format!(
+            "Invalid lhs type {} for DateIntervalExpr",
+            operand.get_datatype()
+        )))?,
+    };
+    Ok(res)
+}
+
+#[inline]
+fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::days(days as i64));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_days() as i32)
+}
+
+#[inline]
+fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let epoch = NaiveDate::from_ymd(1970, 1, 1);
+    let prior = epoch.add(Duration::milliseconds(ms));
+    let posterior = do_date_math(prior, scalar, sign)?;
+    Ok(posterior.sub(epoch).num_milliseconds())
+}
+
+#[inline]
+fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    Ok(do_data_time_math(ts_s, 0, scalar, sign)?.timestamp())
+}
+
+#[inline]
+fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_ms / 1000;
+    let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
+    Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_millis())
+}
+
+#[inline]
+fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_us / 1_000_000;
+    let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
+    Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000)
+}
+
+#[inline]
+fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+    let secs = ts_ns / 1_000_000_000;
+    let nsecs = (ts_ns % 1_000_000_000) as u32;
+    Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos())
+}
+
+#[inline]
+fn do_data_time_math(
+    secs: i64,
+    nsecs: u32,
+    scalar: &ScalarValue,
+    sign: i32,
+) -> Result<NaiveDateTime> {
+    let prior = NaiveDateTime::from_timestamp(secs, nsecs);
+    do_date_math(prior, scalar, sign)
+}
+
+fn do_date_math<D>(prior: D, scalar: &ScalarValue, sign: i32) -> Result<D>
+where
+    D: Datelike + Add<Duration, Output = D>,
+{
+    Ok(match scalar {
+        ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+        ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign),
+        ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
+        other => Err(DataFusionError::Execution(format!(
+            "DateIntervalExpr does not support non-interval type {:?}",
+            other
+        )))?,
+    })
+}
+
 // Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
-fn add_m_d_nano(prior: NaiveDate, interval: i128, sign: i32) -> NaiveDate {
+fn add_m_d_nano<D>(prior: D, interval: i128, sign: i32) -> D
+where
+    D: Datelike + Add<Duration, Output = D>,
+{
     let interval = interval as u128;
     let nanos = (interval >> 64) as i64 * sign as i64;
     let days = (interval >> 32) as i32 * sign;
@@ -172,7 +320,10 @@ fn add_m_d_nano(prior: NaiveDate, interval: i128, sign: i32) -> NaiveDate {
 }
 
 // Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
-fn add_day_time(prior: NaiveDate, interval: i64, sign: i32) -> NaiveDate {
+fn add_day_time<D>(prior: D, interval: i64, sign: i32) -> D
+where
+    D: Datelike + Add<Duration, Output = D>,
+{
     let interval = interval as u64;
     let days = (interval >> 32) as i32 * sign;
     let ms = interval as i32 * sign;
@@ -187,7 +338,7 @@ mod tests {
     use crate::execution_props::ExecutionProps;
     use arrow::array::{ArrayRef, Date32Builder};
     use arrow::datatypes::*;
-    use datafusion_common::{Result, ToDFSchema};
+    use datafusion_common::{Column, Result, ToDFSchema};
     use datafusion_expr::Expr;
 
     #[test]
@@ -362,6 +513,165 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn add_1_millisecond() -> Result<()> {
+        // setup
+        let now_ts_ns = chrono::Utc::now().timestamp_nanos();
+        let dt = Expr::Literal(ScalarValue::TimestampNanosecond(Some(now_ts_ns), None));
+        let op = Operator::Plus;
+        let interval = create_day_time(0, 1);
+        let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+        // exercise
+        let res = exercise(&dt, op, &interval)?;
+
+        // assert
+        match res {
+            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), None)) => {
+                assert_eq!(ts, now_ts_ns + 1_000_000);
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Unexpected result!".to_string(),
+            ))?,
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn add_2_hours() -> Result<()> {
+        // setup
+        let now_ts_s = chrono::Utc::now().timestamp();
+        let dt = Expr::Literal(ScalarValue::TimestampSecond(Some(now_ts_s), None));
+        let op = Operator::Plus;
+        let interval = create_day_time(0, 2 * 3600 * 1_000);
+        let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+        // exercise
+        let res = exercise(&dt, op, &interval)?;
+
+        // assert
+        match res {
+            ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(ts), None)) => {
+                assert_eq!(ts, now_ts_s + 2 * 3600);
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Unexpected result!".to_string(),
+            ))?,
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn sub_4_hours() -> Result<()> {
+        // setup
+        let now_ts_s = chrono::Utc::now().timestamp();
+        let dt = Expr::Literal(ScalarValue::TimestampSecond(Some(now_ts_s), None));
+        let op = Operator::Minus;
+        let interval = create_day_time(0, 4 * 3600 * 1_000);
+        let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+        // exercise
+        let res = exercise(&dt, op, &interval)?;
+
+        // assert
+        match res {
+            ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(ts), None)) => {
+                assert_eq!(ts, now_ts_s - 4 * 3600);
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Unexpected result!".to_string(),
+            ))?,
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn add_8_days() -> Result<()> {
+        // setup
+        let now_ts_ns = chrono::Utc::now().timestamp_nanos();
+        let dt = Expr::Literal(ScalarValue::TimestampNanosecond(Some(now_ts_ns), None));
+        let op = Operator::Plus;
+        let interval = create_day_time(8, 0);
+        let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+        // exercise
+        let res = exercise(&dt, op, &interval)?;
+
+        // assert
+        match res {
+            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), None)) => {
+                assert_eq!(ts, now_ts_ns + 8 * 86400 * 1_000_000_000);
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Unexpected result!".to_string(),
+            ))?,
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn sub_16_days() -> Result<()> {
+        // setup
+        let now_ts_ns = chrono::Utc::now().timestamp_nanos();
+        let dt = Expr::Literal(ScalarValue::TimestampNanosecond(Some(now_ts_ns), None));
+        let op = Operator::Minus;
+        let interval = create_day_time(16, 0);
+        let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+        // exercise
+        let res = exercise(&dt, op, &interval)?;
+
+        // assert
+        match res {
+            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), None)) => {
+                assert_eq!(ts, now_ts_ns - 16 * 86400 * 1_000_000_000);
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Unexpected result!".to_string(),
+            ))?,
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn array_add_26_days() -> Result<()> {
+        let mut builder = Date32Builder::new(8);
+        builder.append_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);
+        let a: ArrayRef = Arc::new(builder.finish());
+
+        let schema = Schema::new(vec![Field::new("a", DataType::Date32, false)]);
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
+        let dfs = schema.clone().to_dfschema()?;
+        let props = ExecutionProps::new();
+
+        let dt = Expr::Column(Column::from_name("a"));
+        let interval = create_day_time(26, 0);
+        let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+        let op = Operator::Plus;
+
+        let lhs = create_physical_expr(&dt, &dfs, &schema, &props)?;
+        let rhs = create_physical_expr(&interval, &dfs, &schema, &props)?;
+
+        let cut = DateIntervalExpr::try_new(lhs, op, rhs, &schema)?;
+        let res = cut.evaluate(&batch)?;
+
+        let mut builder = Date32Builder::new(8);
+        builder.append_slice(&[26, 27, 28, 29, 30, 31, 32, 33]);
+        let expected: ArrayRef = Arc::new(builder.finish());
+
+        // assert
+        match res {
+            ColumnarValue::Array(array) => {
+                assert_eq!(&array, &expected)
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Unexpected result!".to_string(),
+            ))?,
+        }
+
+        Ok(())
+    }
+
     #[test]
     fn invalid_interval() -> Result<()> {
         // setup
diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs
index a7f774073..ffb3f2939 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -90,7 +90,7 @@ pub fn create_physical_expr(
                 rhs.data_type(input_schema)?,
             ) {
                 (
-                    DataType::Date32 | DataType::Date64,
+                    DataType::Date32 | DataType::Date64 | DataType::Timestamp(_, _),
                     Operator::Plus | Operator::Minus,
                     DataType::Interval(_),
                 ) => Ok(Arc::new(DateIntervalExpr::try_new(