You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/08/15 18:06:54 UTC
[arrow-datafusion] branch master updated: Feature/support timestamp plus minus interval (#3110)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6509d0d70 Feature/support timestamp plus minus interval (#3110)
6509d0d70 is described below
commit 6509d0d70421ddc7f934c24348000655e86c6b76
Author: JasonLi <li...@126.com>
AuthorDate: Tue Aug 16 02:06:49 2022 +0800
Feature/support timestamp plus minus interval (#3110)
* support Timestamp +/- Interval
* restore example parquet_sql
* support date and timestamp array +/- interval scalar
Co-authored-by: jasonnnli <ja...@tencent.com>
---
datafusion/core/tests/sql/timestamp.rs | 139 +++++++
datafusion/expr/src/binary_rule.rs | 4 +-
.../physical-expr/src/expressions/datetime.rs | 440 ++++++++++++++++++---
datafusion/physical-expr/src/planner.rs | 2 +-
4 files changed, 518 insertions(+), 67 deletions(-)
diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs
index 257fa4658..ba916a658 100644
--- a/datafusion/core/tests/sql/timestamp.rs
+++ b/datafusion/core/tests/sql/timestamp.rs
@@ -17,6 +17,7 @@
use super::*;
use datafusion::from_slice::FromSlice;
+use std::ops::Add;
#[tokio::test]
async fn query_cast_timestamp_millis() -> Result<()> {
@@ -1256,3 +1257,141 @@ async fn date_bin() {
"Arrow error: External error: This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
);
}
+
+#[tokio::test]
+async fn timestamp_add_interval_second() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "SELECT NOW(), NOW() + INTERVAL '1' SECOND;";
+ let results = execute_to_batches(&ctx, sql).await;
+ let actual = result_vec(&results);
+
+ let res1 = actual[0][0].as_str();
+ let res2 = actual[0][1].as_str();
+
+ let format = "%Y-%m-%d %H:%M:%S%.6f";
+ let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap();
+ let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap();
+
+ assert_eq!(t1_naive.add(Duration::seconds(1)), t2_naive);
+ Ok(())
+}
+
+#[tokio::test]
+async fn timestamp_sub_interval_days() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "SELECT NOW(), NOW() - INTERVAL '8' DAY;";
+ let results = execute_to_batches(&ctx, sql).await;
+ let actual = result_vec(&results);
+
+ let res1 = actual[0][0].as_str();
+ let res2 = actual[0][1].as_str();
+
+ let format = "%Y-%m-%d %H:%M:%S%.6f";
+ let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap();
+ let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap();
+
+ assert_eq!(t1_naive.sub(Duration::days(8)), t2_naive);
+ Ok(())
+}
+
+#[tokio::test]
+async fn timestamp_add_interval_months() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "SELECT NOW(), NOW() + INTERVAL '4' MONTH;";
+ let results = execute_to_batches(&ctx, sql).await;
+ let actual = result_vec(&results);
+
+ let res1 = actual[0][0].as_str();
+ let res2 = actual[0][1].as_str();
+
+ let format = "%Y-%m-%d %H:%M:%S%.6f";
+ let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap();
+ let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap();
+
+ assert_eq!(t1_naive.with_month(t1_naive.month() + 4).unwrap(), t2_naive);
+ Ok(())
+}
+
+#[tokio::test]
+async fn timestamp_sub_interval_years() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "SELECT NOW(), NOW() - INTERVAL '16' YEAR;";
+ let results = execute_to_batches(&ctx, sql).await;
+ let actual = result_vec(&results);
+
+ let res1 = actual[0][0].as_str();
+ let res2 = actual[0][1].as_str();
+
+ let format = "%Y-%m-%d %H:%M:%S%.6f";
+ let t1_naive = chrono::NaiveDateTime::parse_from_str(res1, format).unwrap();
+ let t2_naive = chrono::NaiveDateTime::parse_from_str(res2, format).unwrap();
+
+ assert_eq!(t1_naive.with_year(t1_naive.year() - 16).unwrap(), t2_naive);
+ Ok(())
+}
+
+#[tokio::test]
+async fn timestamp_array_add_interval() -> Result<()> {
+ let ctx = SessionContext::new();
+ let table_a = make_timestamp_table::<TimestampNanosecondType>()?;
+ let table_b = make_timestamp_table::<TimestampMicrosecondType>()?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT ts, ts - INTERVAL '8' MILLISECONDS FROM table_a";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+----------------------------+---------------------------------------+",
+ "| ts | table_a.ts Minus IntervalDayTime(\"8\") |",
+ "+----------------------------+---------------------------------------+",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.182855 |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.182855 |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.182855 |",
+ "+----------------------------+---------------------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+
+ let sql = "SELECT ts, ts + INTERVAL '1' SECOND FROM table_b";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+----------------------------+-----------------------------------------+",
+ "| ts | table_b.ts Plus IntervalDayTime(\"1000\") |",
+ "+----------------------------+-----------------------------------------+",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:30.190855 |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:30.190855 |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:30.190855 |",
+ "+----------------------------+-----------------------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+
+ let sql = "SELECT ts, ts + INTERVAL '2' MONTH FROM table_b";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+----------------------------+----------------------------------------+",
+ "| ts | table_b.ts Plus IntervalYearMonth(\"2\") |",
+ "+----------------------------+----------------------------------------+",
+ "| 2020-09-08 13:42:29.190855 | 2020-11-08 13:42:29.190855 |",
+ "| 2020-09-08 12:42:29.190855 | 2020-11-08 12:42:29.190855 |",
+ "| 2020-09-08 11:42:29.190855 | 2020-11-08 11:42:29.190855 |",
+ "+----------------------------+----------------------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+
+ let sql = "SELECT ts, ts - INTERVAL '16' YEAR FROM table_b";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+----------------------------+-------------------------------------------+",
+ "| ts | table_b.ts Minus IntervalYearMonth(\"192\") |",
+ "+----------------------------+-------------------------------------------+",
+ "| 2020-09-08 13:42:29.190855 | 2004-09-08 13:42:29.190855 |",
+ "| 2020-09-08 12:42:29.190855 | 2004-09-08 12:42:29.190855 |",
+ "| 2020-09-08 11:42:29.190855 | 2004-09-08 11:42:29.190855 |",
+ "+----------------------------+-------------------------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
diff --git a/datafusion/expr/src/binary_rule.rs b/datafusion/expr/src/binary_rule.rs
index f71e97e49..d0057a1c9 100644
--- a/datafusion/expr/src/binary_rule.rs
+++ b/datafusion/expr/src/binary_rule.rs
@@ -99,7 +99,9 @@ pub fn coerce_types(
Operator::Like | Operator::NotLike => like_coercion(lhs_type, rhs_type),
// date +/- interval returns date
Operator::Plus | Operator::Minus
- if (*lhs_type == DataType::Date32 || *lhs_type == DataType::Date64) =>
+ if (*lhs_type == DataType::Date32
+ || *lhs_type == DataType::Date64
+ || matches!(lhs_type, DataType::Timestamp(_, _))) =>
{
match rhs_type {
DataType::Interval(_) => Some(lhs_type.clone()),
diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs
index 3c59a90b0..bb76862f4 100644
--- a/datafusion/physical-expr/src/expressions/datetime.rs
+++ b/datafusion/physical-expr/src/expressions/datetime.rs
@@ -17,9 +17,17 @@
use crate::expressions::delta::shift_months;
use crate::PhysicalExpr;
-use arrow::datatypes::{DataType, Schema};
+use arrow::array::{
+ Array, ArrayRef, Date32Array, Date64Array, TimestampMicrosecondArray,
+ TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+};
+use arrow::compute::unary;
+use arrow::datatypes::{
+ DataType, Date32Type, Date64Type, Schema, TimeUnit, TimestampMicrosecondType,
+ TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
+};
use arrow::record_batch::RecordBatch;
-use chrono::{Duration, NaiveDate};
+use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime};
use datafusion_common::Result;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::{ColumnarValue, Operator};
@@ -45,19 +53,21 @@ impl DateIntervalExpr {
input_schema: &Schema,
) -> Result<Self> {
match lhs.data_type(input_schema)? {
- DataType::Date32 | DataType::Date64 => match rhs.data_type(input_schema)? {
- DataType::Interval(_) => match &op {
- Operator::Plus | Operator::Minus => Ok(Self { lhs, op, rhs }),
- _ => Err(DataFusionError::Execution(format!(
- "Invalid operator '{}' for DateIntervalExpr",
- op
+ DataType::Date32 | DataType::Date64 | DataType::Timestamp(_, _) => {
+ match rhs.data_type(input_schema)? {
+ DataType::Interval(_) => match &op {
+ Operator::Plus | Operator::Minus => Ok(Self { lhs, op, rhs }),
+ _ => Err(DataFusionError::Execution(format!(
+ "Invalid operator '{}' for DateIntervalExpr",
+ op
+ ))),
+ },
+ other => Err(DataFusionError::Execution(format!(
+ "Operation '{}' not support for type {}",
+ op, other
))),
- },
- other => Err(DataFusionError::Execution(format!(
- "Invalid rhs type '{}' for DateIntervalExpr",
- other
- ))),
- },
+ }
+ }
other => Err(DataFusionError::Execution(format!(
"Invalid lhs type '{}' for DateIntervalExpr",
other
@@ -89,28 +99,8 @@ impl PhysicalExpr for DateIntervalExpr {
let dates = self.lhs.evaluate(batch)?;
let intervals = self.rhs.evaluate(batch)?;
- // Unwrap days since epoch
- let operand = match dates {
- ColumnarValue::Scalar(scalar) => scalar,
- _ => Err(DataFusionError::Execution(
- "Columnar execution is not yet supported for DateIntervalExpr"
- .to_string(),
- ))?,
- };
-
- // Convert to NaiveDate
- let epoch = NaiveDate::from_ymd(1970, 1, 1);
- let prior = match operand {
- ScalarValue::Date32(Some(d)) => epoch.add(Duration::days(d as i64)),
- ScalarValue::Date64(Some(ms)) => epoch.add(Duration::milliseconds(ms)),
- _ => Err(DataFusionError::Execution(format!(
- "Invalid lhs type for DateIntervalExpr: {:?}",
- operand
- )))?,
- };
-
// Unwrap interval to add
- let scalar = match &intervals {
+ let intervals = match &intervals {
ColumnarValue::Scalar(interval) => interval,
_ => Err(DataFusionError::Execution(
"Columnar execution is not yet supported for DateIntervalExpr"
@@ -130,38 +120,196 @@ impl PhysicalExpr for DateIntervalExpr {
}
};
- // Do math
- let posterior = match scalar {
- ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
- ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign),
- ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
- other => Err(DataFusionError::Execution(format!(
- "DateIntervalExpr does not support non-interval type {:?}",
- other
- )))?,
- };
-
- // convert back
- let res = match operand {
- ScalarValue::Date32(Some(_)) => {
- let days = posterior.sub(epoch).num_days() as i32;
- ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
- }
- ScalarValue::Date64(Some(_)) => {
- let ms = posterior.sub(epoch).num_milliseconds();
- ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
- }
- _ => Err(DataFusionError::Execution(format!(
- "Invalid lhs type for DateIntervalExpr: {}",
- scalar
- )))?,
- };
- Ok(res)
+ match dates {
+ ColumnarValue::Scalar(operand) => evaluate_scalar(operand, sign, intervals),
+ ColumnarValue::Array(array) => evaluate_array(array, sign, intervals),
+ }
}
}
+pub fn evaluate_array(
+ array: ArrayRef,
+ sign: i32,
+ scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+ let ret = match array.data_type() {
+ DataType::Date32 => {
+ let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
+ Arc::new(unary::<Date32Type, _, Date32Type>(array, |days| {
+ date32_add(days, scalar, sign).unwrap()
+ })) as ArrayRef
+ }
+ DataType::Date64 => {
+ let array = array.as_any().downcast_ref::<Date64Array>().unwrap();
+ Arc::new(unary::<Date64Type, _, Date64Type>(array, |ms| {
+ date64_add(ms, scalar, sign).unwrap()
+ })) as ArrayRef
+ }
+ DataType::Timestamp(TimeUnit::Second, _) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampSecondArray>()
+ .unwrap();
+ Arc::new(unary::<TimestampSecondType, _, TimestampSecondType>(
+ array,
+ |ts_s| seconds_add(ts_s, scalar, sign).unwrap(),
+ )) as ArrayRef
+ }
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .unwrap();
+ Arc::new(
+ unary::<TimestampMillisecondType, _, TimestampMillisecondType>(
+ array,
+ |ts_ms| milliseconds_add(ts_ms, scalar, sign).unwrap(),
+ ),
+ ) as ArrayRef
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap();
+ Arc::new(
+ unary::<TimestampMicrosecondType, _, TimestampMicrosecondType>(
+ array,
+ |ts_us| microseconds_add(ts_us, scalar, sign).unwrap(),
+ ),
+ ) as ArrayRef
+ }
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+ let array = array
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap();
+ Arc::new(
+ unary::<TimestampNanosecondType, _, TimestampNanosecondType>(
+ array,
+ |ts_ns| nanoseconds_add(ts_ns, scalar, sign).unwrap(),
+ ),
+ ) as ArrayRef
+ }
+ _ => Err(DataFusionError::Execution(format!(
+ "Invalid lhs type for DateIntervalExpr: {}",
+ array.data_type()
+ )))?,
+ };
+ Ok(ColumnarValue::Array(ret))
+}
+
+fn evaluate_scalar(
+ operand: ScalarValue,
+ sign: i32,
+ scalar: &ScalarValue,
+) -> Result<ColumnarValue> {
+ let res = match operand {
+ ScalarValue::Date32(Some(days)) => {
+ let value = date32_add(days, scalar, sign)?;
+ ColumnarValue::Scalar(ScalarValue::Date32(Some(value)))
+ }
+ ScalarValue::Date64(Some(ms)) => {
+ let value = date64_add(ms, scalar, sign)?;
+ ColumnarValue::Scalar(ScalarValue::Date64(Some(value)))
+ }
+ ScalarValue::TimestampSecond(Some(ts_s), zone) => {
+ let value = seconds_add(ts_s, scalar, sign)?;
+ ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(value), zone))
+ }
+ ScalarValue::TimestampMillisecond(Some(ts_ms), zone) => {
+ let value = milliseconds_add(ts_ms, scalar, sign)?;
+ ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(value), zone))
+ }
+ ScalarValue::TimestampMicrosecond(Some(ts_us), zone) => {
+ let value = microseconds_add(ts_us, scalar, sign)?;
+ ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(value), zone))
+ }
+ ScalarValue::TimestampNanosecond(Some(ts_ns), zone) => {
+ let value = nanoseconds_add(ts_ns, scalar, sign)?;
+ ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(value), zone))
+ }
+ _ => Err(DataFusionError::Execution(format!(
+ "Invalid lhs type {} for DateIntervalExpr",
+ operand.get_datatype()
+ )))?,
+ };
+ Ok(res)
+}
+
+#[inline]
+fn date32_add(days: i32, scalar: &ScalarValue, sign: i32) -> Result<i32> {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let prior = epoch.add(Duration::days(days as i64));
+ let posterior = do_date_math(prior, scalar, sign)?;
+ Ok(posterior.sub(epoch).num_days() as i32)
+}
+
+#[inline]
+fn date64_add(ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+ let epoch = NaiveDate::from_ymd(1970, 1, 1);
+ let prior = epoch.add(Duration::milliseconds(ms));
+ let posterior = do_date_math(prior, scalar, sign)?;
+ Ok(posterior.sub(epoch).num_milliseconds())
+}
+
+#[inline]
+fn seconds_add(ts_s: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+ Ok(do_data_time_math(ts_s, 0, scalar, sign)?.timestamp())
+}
+
+#[inline]
+fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+ let secs = ts_ms / 1000;
+ let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
+ Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_millis())
+}
+
+#[inline]
+fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+ let secs = ts_us / 1_000_000;
+ let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
+ Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos() / 1000)
+}
+
+#[inline]
+fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
+ let secs = ts_ns / 1_000_000_000;
+ let nsecs = (ts_ns % 1_000_000_000) as u32;
+ Ok(do_data_time_math(secs, nsecs, scalar, sign)?.timestamp_nanos())
+}
+
+#[inline]
+fn do_data_time_math(
+ secs: i64,
+ nsecs: u32,
+ scalar: &ScalarValue,
+ sign: i32,
+) -> Result<NaiveDateTime> {
+ let prior = NaiveDateTime::from_timestamp(secs, nsecs);
+ do_date_math(prior, scalar, sign)
+}
+
+fn do_date_math<D>(prior: D, scalar: &ScalarValue, sign: i32) -> Result<D>
+where
+ D: Datelike + Add<Duration, Output = D>,
+{
+ Ok(match scalar {
+ ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
+ ScalarValue::IntervalYearMonth(Some(i)) => shift_months(prior, *i * sign),
+ ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
+ other => Err(DataFusionError::Execution(format!(
+ "DateIntervalExpr does not support non-interval type {:?}",
+ other
+ )))?,
+ })
+}
+
// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
-fn add_m_d_nano(prior: NaiveDate, interval: i128, sign: i32) -> NaiveDate {
+fn add_m_d_nano<D>(prior: D, interval: i128, sign: i32) -> D
+where
+ D: Datelike + Add<Duration, Output = D>,
+{
let interval = interval as u128;
let nanos = (interval >> 64) as i64 * sign as i64;
let days = (interval >> 32) as i32 * sign;
@@ -172,7 +320,10 @@ fn add_m_d_nano(prior: NaiveDate, interval: i128, sign: i32) -> NaiveDate {
}
// Can remove once https://github.com/apache/arrow-rs/pull/2031 is released
-fn add_day_time(prior: NaiveDate, interval: i64, sign: i32) -> NaiveDate {
+fn add_day_time<D>(prior: D, interval: i64, sign: i32) -> D
+where
+ D: Datelike + Add<Duration, Output = D>,
+{
let interval = interval as u64;
let days = (interval >> 32) as i32 * sign;
let ms = interval as i32 * sign;
@@ -187,7 +338,7 @@ mod tests {
use crate::execution_props::ExecutionProps;
use arrow::array::{ArrayRef, Date32Builder};
use arrow::datatypes::*;
- use datafusion_common::{Result, ToDFSchema};
+ use datafusion_common::{Column, Result, ToDFSchema};
use datafusion_expr::Expr;
#[test]
@@ -362,6 +513,165 @@ mod tests {
Ok(())
}
+ #[test]
+ fn add_1_millisecond() -> Result<()> {
+ // setup
+ let now_ts_ns = chrono::Utc::now().timestamp_nanos();
+ let dt = Expr::Literal(ScalarValue::TimestampNanosecond(Some(now_ts_ns), None));
+ let op = Operator::Plus;
+ let interval = create_day_time(0, 1);
+ let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+ // exercise
+ let res = exercise(&dt, op, &interval)?;
+
+ // assert
+ match res {
+ ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), None)) => {
+ assert_eq!(ts, now_ts_ns + 1_000_000);
+ }
+ _ => Err(DataFusionError::NotImplemented(
+ "Unexpected result!".to_string(),
+ ))?,
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn add_2_hours() -> Result<()> {
+ // setup
+ let now_ts_s = chrono::Utc::now().timestamp();
+ let dt = Expr::Literal(ScalarValue::TimestampSecond(Some(now_ts_s), None));
+ let op = Operator::Plus;
+ let interval = create_day_time(0, 2 * 3600 * 1_000);
+ let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+ // exercise
+ let res = exercise(&dt, op, &interval)?;
+
+ // assert
+ match res {
+ ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(ts), None)) => {
+ assert_eq!(ts, now_ts_s + 2 * 3600);
+ }
+ _ => Err(DataFusionError::NotImplemented(
+ "Unexpected result!".to_string(),
+ ))?,
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn sub_4_hours() -> Result<()> {
+ // setup
+ let now_ts_s = chrono::Utc::now().timestamp();
+ let dt = Expr::Literal(ScalarValue::TimestampSecond(Some(now_ts_s), None));
+ let op = Operator::Minus;
+ let interval = create_day_time(0, 4 * 3600 * 1_000);
+ let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+ // exercise
+ let res = exercise(&dt, op, &interval)?;
+
+ // assert
+ match res {
+ ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(ts), None)) => {
+ assert_eq!(ts, now_ts_s - 4 * 3600);
+ }
+ _ => Err(DataFusionError::NotImplemented(
+ "Unexpected result!".to_string(),
+ ))?,
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn add_8_days() -> Result<()> {
+ // setup
+ let now_ts_ns = chrono::Utc::now().timestamp_nanos();
+ let dt = Expr::Literal(ScalarValue::TimestampNanosecond(Some(now_ts_ns), None));
+ let op = Operator::Plus;
+ let interval = create_day_time(8, 0);
+ let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+ // exercise
+ let res = exercise(&dt, op, &interval)?;
+
+ // assert
+ match res {
+ ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), None)) => {
+ assert_eq!(ts, now_ts_ns + 8 * 86400 * 1_000_000_000);
+ }
+ _ => Err(DataFusionError::NotImplemented(
+ "Unexpected result!".to_string(),
+ ))?,
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn sub_16_days() -> Result<()> {
+ // setup
+ let now_ts_ns = chrono::Utc::now().timestamp_nanos();
+ let dt = Expr::Literal(ScalarValue::TimestampNanosecond(Some(now_ts_ns), None));
+ let op = Operator::Minus;
+ let interval = create_day_time(16, 0);
+ let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+
+ // exercise
+ let res = exercise(&dt, op, &interval)?;
+
+ // assert
+ match res {
+ ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), None)) => {
+ assert_eq!(ts, now_ts_ns - 16 * 86400 * 1_000_000_000);
+ }
+ _ => Err(DataFusionError::NotImplemented(
+ "Unexpected result!".to_string(),
+ ))?,
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn array_add_26_days() -> Result<()> {
+ let mut builder = Date32Builder::new(8);
+ builder.append_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);
+ let a: ArrayRef = Arc::new(builder.finish());
+
+ let schema = Schema::new(vec![Field::new("a", DataType::Date32, false)]);
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?;
+ let dfs = schema.clone().to_dfschema()?;
+ let props = ExecutionProps::new();
+
+ let dt = Expr::Column(Column::from_name("a"));
+ let interval = create_day_time(26, 0);
+ let interval = Expr::Literal(ScalarValue::IntervalDayTime(Some(interval)));
+ let op = Operator::Plus;
+
+ let lhs = create_physical_expr(&dt, &dfs, &schema, &props)?;
+ let rhs = create_physical_expr(&interval, &dfs, &schema, &props)?;
+
+ let cut = DateIntervalExpr::try_new(lhs, op, rhs, &schema)?;
+ let res = cut.evaluate(&batch)?;
+
+ let mut builder = Date32Builder::new(8);
+ builder.append_slice(&[26, 27, 28, 29, 30, 31, 32, 33]);
+ let expected: ArrayRef = Arc::new(builder.finish());
+
+ // assert
+ match res {
+ ColumnarValue::Array(array) => {
+ assert_eq!(&array, &expected)
+ }
+ _ => Err(DataFusionError::NotImplemented(
+ "Unexpected result!".to_string(),
+ ))?,
+ }
+
+ Ok(())
+ }
+
#[test]
fn invalid_interval() -> Result<()> {
// setup
diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs
index a7f774073..ffb3f2939 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -90,7 +90,7 @@ pub fn create_physical_expr(
rhs.data_type(input_schema)?,
) {
(
- DataType::Date32 | DataType::Date64,
+ DataType::Date32 | DataType::Date64 | DataType::Timestamp(_, _),
Operator::Plus | Operator::Minus,
DataType::Interval(_),
) => Ok(Arc::new(DateIntervalExpr::try_new(