You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/12/21 14:19:49 UTC
[arrow-datafusion] branch master updated: Add Timezone to Scalar::Time* types, and better timezone awareness to Datafusion's time types (#1455)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5668be7 Add Timezone to Scalar::Time* types, and better timezone awareness to Datafusion's time types (#1455)
5668be7 is described below
commit 5668be78a9ccbf5469e9e95ad070920ca5d105ba
Author: Max Burke <ma...@urbanlogiq.com>
AuthorDate: Tue Dec 21 06:19:41 2021 -0800
Add Timezone to Scalar::Time* types, and better timezone awareness to Datafusion's time types (#1455)
* point to UL repos
* Make ScalarValue::TimestampNanosecond moderately timezone aware
* cargo fmt
* fix ballista build
* fix ballista tests
* ScalarValue is only 64b on aarch64; it is still 48 on amd64
* remove debugging code
* add tests for timestamp coercion
* minmax test on mixed ts types, allow creation of timestamp tables with a timezone, fix a missed case in the binary ops applied to timestamp types with timezones
---
.../rust/core/src/serde/logical_plan/from_proto.rs | 20 +-
ballista/rust/core/src/serde/logical_plan/mod.rs | 18 +-
.../rust/core/src/serde/logical_plan/to_proto.rs | 4 +-
datafusion/src/logical_plan/expr.rs | 10 +-
datafusion/src/optimizer/simplify_expressions.rs | 4 +-
.../src/physical_plan/datetime_expressions.rs | 8 +-
datafusion/src/physical_plan/expressions/binary.rs | 22 +-
.../src/physical_plan/expressions/coercion.rs | 37 ++
.../src/physical_plan/expressions/min_max.rs | 65 +++-
datafusion/src/physical_plan/functions.rs | 5 +-
datafusion/src/physical_plan/hash_utils.rs | 2 +-
datafusion/src/physical_plan/planner.rs | 1 +
datafusion/src/physical_plan/sort.rs | 11 +-
datafusion/src/scalar.rs | 394 +++++++++++++++------
datafusion/tests/sql.rs | 389 +++++++++++++++++++-
15 files changed, 810 insertions(+), 180 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index ba40488..dfac547 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -494,10 +494,10 @@ fn typechecked_scalar_value_conversion(
ScalarValue::Date32(Some(*v))
}
(Value::TimeMicrosecondValue(v), PrimitiveScalarType::TimeMicrosecond) => {
- ScalarValue::TimestampMicrosecond(Some(*v))
+ ScalarValue::TimestampMicrosecond(Some(*v), None)
}
(Value::TimeNanosecondValue(v), PrimitiveScalarType::TimeMicrosecond) => {
- ScalarValue::TimestampNanosecond(Some(*v))
+ ScalarValue::TimestampNanosecond(Some(*v), None)
}
(Value::Utf8Value(v), PrimitiveScalarType::Utf8) => {
ScalarValue::Utf8(Some(v.to_owned()))
@@ -530,10 +530,10 @@ fn typechecked_scalar_value_conversion(
PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None),
PrimitiveScalarType::Date32 => ScalarValue::Date32(None),
PrimitiveScalarType::TimeMicrosecond => {
- ScalarValue::TimestampMicrosecond(None)
+ ScalarValue::TimestampMicrosecond(None, None)
}
PrimitiveScalarType::TimeNanosecond => {
- ScalarValue::TimestampNanosecond(None)
+ ScalarValue::TimestampNanosecond(None, None)
}
PrimitiveScalarType::Null => {
return Err(proto_error(
@@ -593,10 +593,10 @@ impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::scalar_value::Value
ScalarValue::Date32(Some(*v))
}
protobuf::scalar_value::Value::TimeMicrosecondValue(v) => {
- ScalarValue::TimestampMicrosecond(Some(*v))
+ ScalarValue::TimestampMicrosecond(Some(*v), None)
}
protobuf::scalar_value::Value::TimeNanosecondValue(v) => {
- ScalarValue::TimestampNanosecond(Some(*v))
+ ScalarValue::TimestampNanosecond(Some(*v), None)
}
protobuf::scalar_value::Value::ListValue(v) => v.try_into()?,
protobuf::scalar_value::Value::NullListValue(v) => {
@@ -758,10 +758,10 @@ impl TryInto<datafusion::scalar::ScalarValue> for protobuf::PrimitiveScalarType
protobuf::PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None),
protobuf::PrimitiveScalarType::Date32 => ScalarValue::Date32(None),
protobuf::PrimitiveScalarType::TimeMicrosecond => {
- ScalarValue::TimestampMicrosecond(None)
+ ScalarValue::TimestampMicrosecond(None, None)
}
protobuf::PrimitiveScalarType::TimeNanosecond => {
- ScalarValue::TimestampNanosecond(None)
+ ScalarValue::TimestampNanosecond(None, None)
}
})
}
@@ -811,10 +811,10 @@ impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::ScalarValue {
ScalarValue::Date32(Some(*v))
}
protobuf::scalar_value::Value::TimeMicrosecondValue(v) => {
- ScalarValue::TimestampMicrosecond(Some(*v))
+ ScalarValue::TimestampMicrosecond(Some(*v), None)
}
protobuf::scalar_value::Value::TimeNanosecondValue(v) => {
- ScalarValue::TimestampNanosecond(Some(*v))
+ ScalarValue::TimestampNanosecond(Some(*v), None)
}
protobuf::scalar_value::Value::ListValue(scalar_list) => {
let protobuf::ScalarListValue {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index a5e2aa0..a0f481a 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -216,8 +216,8 @@ mod roundtrip_tests {
ScalarValue::LargeUtf8(None),
ScalarValue::List(None, Box::new(DataType::Boolean)),
ScalarValue::Date32(None),
- ScalarValue::TimestampMicrosecond(None),
- ScalarValue::TimestampNanosecond(None),
+ ScalarValue::TimestampMicrosecond(None, None),
+ ScalarValue::TimestampNanosecond(None, None),
ScalarValue::Boolean(Some(true)),
ScalarValue::Boolean(Some(false)),
ScalarValue::Float32(Some(1.0)),
@@ -256,11 +256,11 @@ mod roundtrip_tests {
ScalarValue::LargeUtf8(Some(String::from("Test Large utf8"))),
ScalarValue::Date32(Some(0)),
ScalarValue::Date32(Some(i32::MAX)),
- ScalarValue::TimestampNanosecond(Some(0)),
- ScalarValue::TimestampNanosecond(Some(i64::MAX)),
- ScalarValue::TimestampMicrosecond(Some(0)),
- ScalarValue::TimestampMicrosecond(Some(i64::MAX)),
- ScalarValue::TimestampMicrosecond(None),
+ ScalarValue::TimestampNanosecond(Some(0), None),
+ ScalarValue::TimestampNanosecond(Some(i64::MAX), None),
+ ScalarValue::TimestampMicrosecond(Some(0), None),
+ ScalarValue::TimestampMicrosecond(Some(i64::MAX), None),
+ ScalarValue::TimestampMicrosecond(None, None),
ScalarValue::List(
Some(Box::new(vec![
ScalarValue::Float32(Some(-213.1)),
@@ -619,8 +619,8 @@ mod roundtrip_tests {
ScalarValue::Utf8(None),
ScalarValue::LargeUtf8(None),
ScalarValue::Date32(None),
- ScalarValue::TimestampMicrosecond(None),
- ScalarValue::TimestampNanosecond(None),
+ ScalarValue::TimestampMicrosecond(None, None),
+ ScalarValue::TimestampNanosecond(None, None),
//ScalarValue::List(None, DataType::Boolean)
];
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 68ed709..47b5df4 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -652,12 +652,12 @@ impl TryFrom<&datafusion::scalar::ScalarValue> for protobuf::ScalarValue {
datafusion::scalar::ScalarValue::Date32(val) => {
create_proto_scalar(val, PrimitiveScalarType::Date32, |s| Value::Date32Value(*s))
}
- datafusion::scalar::ScalarValue::TimestampMicrosecond(val) => {
+ datafusion::scalar::ScalarValue::TimestampMicrosecond(val, _) => {
create_proto_scalar(val, PrimitiveScalarType::TimeMicrosecond, |s| {
Value::TimeMicrosecondValue(*s)
})
}
- datafusion::scalar::ScalarValue::TimestampNanosecond(val) => {
+ datafusion::scalar::ScalarValue::TimestampNanosecond(val, _) => {
create_proto_scalar(val, PrimitiveScalarType::TimeNanosecond, |s| {
Value::TimeNanosecondValue(*s)
})
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index bcdfae7..fc862cd 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -1478,9 +1478,10 @@ macro_rules! make_timestamp_literal {
#[doc = $DOC]
impl TimestampLiteral for $TYPE {
fn lit_timestamp_nano(&self) -> Expr {
- Expr::Literal(ScalarValue::TimestampNanosecond(Some(
- (self.clone()).into(),
- )))
+ Expr::Literal(ScalarValue::TimestampNanosecond(
+ Some((self.clone()).into()),
+ None,
+ ))
}
}
};
@@ -2048,7 +2049,8 @@ mod tests {
#[test]
fn test_lit_timestamp_nano() {
let expr = col("time").eq(lit_timestamp_nano(10)); // 10 is an implicit i32
- let expected = col("time").eq(lit(ScalarValue::TimestampNanosecond(Some(10))));
+ let expected =
+ col("time").eq(lit(ScalarValue::TimestampNanosecond(Some(10), None)));
assert_eq!(expr, expected);
let i: i64 = 10;
diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs
index 0ca9212..ff2c05c 100644
--- a/datafusion/src/optimizer/simplify_expressions.rs
+++ b/datafusion/src/optimizer/simplify_expressions.rs
@@ -1703,7 +1703,7 @@ mod tests {
.build()
.unwrap();
- let expected = "Projection: TimestampNanosecond(1599566400000000000) AS totimestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\
+ let expected = "Projection: TimestampNanosecond(1599566400000000000, None) AS totimestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\
\n TableScan: test projection=None"
.to_string();
let actual = get_optimized_plan_formatted(&plan, &Utc::now());
@@ -1780,7 +1780,7 @@ mod tests {
// expect the same timestamp appears in both exprs
let actual = get_optimized_plan_formatted(&plan, &time);
let expected = format!(
- "Projection: TimestampNanosecond({}) AS now(), TimestampNanosecond({}) AS t2\
+ "Projection: TimestampNanosecond({}, Some(\"UTC\")) AS now(), TimestampNanosecond({}, Some(\"UTC\")) AS t2\
\n TableScan: test projection=None",
time.timestamp_nanos(),
time.timestamp_nanos()
diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs
index d103127..6af2f66 100644
--- a/datafusion/src/physical_plan/datetime_expressions.rs
+++ b/datafusion/src/physical_plan/datetime_expressions.rs
@@ -181,6 +181,7 @@ pub fn make_now(
move |_arg| {
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
now_ts,
+ Some("UTC".to_owned()),
)))
}
}
@@ -240,8 +241,11 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, x)).transpose();
Ok(match array {
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v)) => {
- ColumnarValue::Scalar(ScalarValue::TimestampNanosecond((f)(*v)?))
+ ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
+ ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
+ (f)(*v)?,
+ tz_opt.clone(),
+ ))
}
ColumnarValue::Array(array) => {
let array = array
diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs
index d8bae7d..bd593fd 100644
--- a/datafusion/src/physical_plan/expressions/binary.rs
+++ b/datafusion/src/physical_plan/expressions/binary.rs
@@ -329,16 +329,16 @@ macro_rules! binary_array_op_scalar {
DataType::Float32 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float64Array),
DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray),
- DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
- DataType::Timestamp(TimeUnit::Microsecond, None) => {
+ DataType::Timestamp(TimeUnit::Microsecond, _) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
}
- DataType::Timestamp(TimeUnit::Millisecond, None) => {
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
}
- DataType::Timestamp(TimeUnit::Second, None) => {
+ DataType::Timestamp(TimeUnit::Second, _) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampSecondArray)
}
DataType::Date32 => {
@@ -374,16 +374,16 @@ macro_rules! binary_array_op {
DataType::Float32 => compute_op!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array),
DataType::Utf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, StringArray),
- DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
- DataType::Timestamp(TimeUnit::Microsecond, None) => {
+ DataType::Timestamp(TimeUnit::Microsecond, _) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
}
- DataType::Timestamp(TimeUnit::Millisecond, None) => {
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
}
- DataType::Timestamp(TimeUnit::Second, None) => {
+ DataType::Timestamp(TimeUnit::Second, _) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampSecondArray)
}
DataType::Date32 => {
@@ -541,12 +541,14 @@ fn common_binary_type(
// re-write the error message of failed coercions to include the operator's information
match result {
- None => Err(DataFusionError::Plan(
+ None => {
+ Err(DataFusionError::Plan(
format!(
"'{:?} {} {:?}' can't be evaluated because there isn't a common type to coerce the types to",
lhs_type, op, rhs_type
),
- )),
+ ))
+ },
Some(t) => Ok(t)
}
}
diff --git a/datafusion/src/physical_plan/expressions/coercion.rs b/datafusion/src/physical_plan/expressions/coercion.rs
index 180b165..a449a8d 100644
--- a/datafusion/src/physical_plan/expressions/coercion.rs
+++ b/datafusion/src/physical_plan/expressions/coercion.rs
@@ -100,11 +100,48 @@ pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataTyp
/// casted to for the purpose of a date computation
pub fn temporal_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
use arrow::datatypes::DataType::*;
+ use arrow::datatypes::TimeUnit;
match (lhs_type, rhs_type) {
(Utf8, Date32) => Some(Date32),
(Date32, Utf8) => Some(Date32),
(Utf8, Date64) => Some(Date64),
(Date64, Utf8) => Some(Date64),
+ (Timestamp(lhs_unit, lhs_tz), Timestamp(rhs_unit, rhs_tz)) => {
+ let tz = match (lhs_tz, rhs_tz) {
+ // can't cast across timezones
+ (Some(lhs_tz), Some(rhs_tz)) => {
+ if lhs_tz != rhs_tz {
+ return None;
+ } else {
+ Some(lhs_tz.clone())
+ }
+ }
+ (Some(lhs_tz), None) => Some(lhs_tz.clone()),
+ (None, Some(rhs_tz)) => Some(rhs_tz.clone()),
+ (None, None) => None,
+ };
+
+ let unit = match (lhs_unit, rhs_unit) {
+ (TimeUnit::Second, TimeUnit::Millisecond) => TimeUnit::Second,
+ (TimeUnit::Second, TimeUnit::Microsecond) => TimeUnit::Second,
+ (TimeUnit::Second, TimeUnit::Nanosecond) => TimeUnit::Second,
+ (TimeUnit::Millisecond, TimeUnit::Second) => TimeUnit::Second,
+ (TimeUnit::Millisecond, TimeUnit::Microsecond) => TimeUnit::Millisecond,
+ (TimeUnit::Millisecond, TimeUnit::Nanosecond) => TimeUnit::Millisecond,
+ (TimeUnit::Microsecond, TimeUnit::Second) => TimeUnit::Second,
+ (TimeUnit::Microsecond, TimeUnit::Millisecond) => TimeUnit::Millisecond,
+ (TimeUnit::Microsecond, TimeUnit::Nanosecond) => TimeUnit::Microsecond,
+ (TimeUnit::Nanosecond, TimeUnit::Second) => TimeUnit::Second,
+ (TimeUnit::Nanosecond, TimeUnit::Millisecond) => TimeUnit::Millisecond,
+ (TimeUnit::Nanosecond, TimeUnit::Microsecond) => TimeUnit::Microsecond,
+ (l, r) => {
+ assert_eq!(l, r);
+ l.clone()
+ }
+ };
+
+ Some(Timestamp(unit, tz))
+ }
_ => None,
}
}
diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs
index 2f61881..8f6cd45 100644
--- a/datafusion/src/physical_plan/expressions/min_max.rs
+++ b/datafusion/src/physical_plan/expressions/min_max.rs
@@ -129,6 +129,12 @@ macro_rules! typed_min_max_batch {
let value = compute::$OP(array);
ScalarValue::$SCALAR(value)
}};
+
+ ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident, $TZ:expr) => {{
+ let array = $VALUES.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+ let value = compute::$OP(array);
+ ScalarValue::$SCALAR(value, $TZ.clone())
+ }};
}
// TODO implement this in arrow-rs with simd
@@ -189,26 +195,35 @@ macro_rules! min_max_batch {
DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP),
DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP),
DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP),
- DataType::Timestamp(TimeUnit::Second, _) => {
- typed_min_max_batch!($VALUES, TimestampSecondArray, TimestampSecond, $OP)
+ DataType::Timestamp(TimeUnit::Second, tz_opt) => {
+ typed_min_max_batch!(
+ $VALUES,
+ TimestampSecondArray,
+ TimestampSecond,
+ $OP,
+ tz_opt
+ )
}
- DataType::Timestamp(TimeUnit::Millisecond, _) => typed_min_max_batch!(
+ DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!(
$VALUES,
TimestampMillisecondArray,
TimestampMillisecond,
- $OP
+ $OP,
+ tz_opt
),
- DataType::Timestamp(TimeUnit::Microsecond, _) => typed_min_max_batch!(
+ DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!(
$VALUES,
TimestampMicrosecondArray,
TimestampMicrosecond,
- $OP
+ $OP,
+ tz_opt
),
- DataType::Timestamp(TimeUnit::Nanosecond, _) => typed_min_max_batch!(
+ DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!(
$VALUES,
TimestampNanosecondArray,
TimestampNanosecond,
- $OP
+ $OP,
+ tz_opt
),
DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP),
DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP),
@@ -273,6 +288,18 @@ macro_rules! typed_min_max {
(Some(a), Some(b)) => Some((*a).$OP(*b)),
})
}};
+
+ ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident, $TZ:expr) => {{
+ ScalarValue::$SCALAR(
+ match ($VALUE, $DELTA) {
+ (None, None) => None,
+ (Some(a), None) => Some(a.clone()),
+ (None, Some(b)) => Some(b.clone()),
+ (Some(a), Some(b)) => Some((*a).$OP(*b)),
+ },
+ $TZ.clone(),
+ )
+ }};
}
// min/max of two scalar string values.
@@ -337,26 +364,26 @@ macro_rules! min_max {
(ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => {
typed_min_max_string!(lhs, rhs, LargeUtf8, $OP)
}
- (ScalarValue::TimestampSecond(lhs), ScalarValue::TimestampSecond(rhs)) => {
- typed_min_max!(lhs, rhs, TimestampSecond, $OP)
+ (ScalarValue::TimestampSecond(lhs, l_tz), ScalarValue::TimestampSecond(rhs, _)) => {
+ typed_min_max!(lhs, rhs, TimestampSecond, $OP, l_tz)
}
(
- ScalarValue::TimestampMillisecond(lhs),
- ScalarValue::TimestampMillisecond(rhs),
+ ScalarValue::TimestampMillisecond(lhs, l_tz),
+ ScalarValue::TimestampMillisecond(rhs, _),
) => {
- typed_min_max!(lhs, rhs, TimestampMillisecond, $OP)
+ typed_min_max!(lhs, rhs, TimestampMillisecond, $OP, l_tz)
}
(
- ScalarValue::TimestampMicrosecond(lhs),
- ScalarValue::TimestampMicrosecond(rhs),
+ ScalarValue::TimestampMicrosecond(lhs, l_tz),
+ ScalarValue::TimestampMicrosecond(rhs, _),
) => {
- typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP)
+ typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP, l_tz)
}
(
- ScalarValue::TimestampNanosecond(lhs),
- ScalarValue::TimestampNanosecond(rhs),
+ ScalarValue::TimestampNanosecond(lhs, l_tz),
+ ScalarValue::TimestampNanosecond(rhs, _),
) => {
- typed_min_max!(lhs, rhs, TimestampNanosecond, $OP)
+ typed_min_max!(lhs, rhs, TimestampNanosecond, $OP, l_tz)
}
(
ScalarValue::Date32(lhs),
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 9c59b96..df073b6 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -612,7 +612,10 @@ pub fn return_type(
BuiltinScalarFunction::ToTimestampSeconds => {
Ok(DataType::Timestamp(TimeUnit::Second, None))
}
- BuiltinScalarFunction::Now => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
+ BuiltinScalarFunction::Now => Ok(DataType::Timestamp(
+ TimeUnit::Nanosecond,
+ Some("UTC".to_owned()),
+ )),
BuiltinScalarFunction::Translate => {
utf8_to_str_type(&input_expr_types[0], "translate")
}
diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs
index fbd0c97..25d1f3f 100644
--- a/datafusion/src/physical_plan/hash_utils.rs
+++ b/datafusion/src/physical_plan/hash_utils.rs
@@ -369,7 +369,7 @@ pub fn create_hashes<'a>(
multi_col
);
}
- DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => {
hash_array_primitive!(
TimestampNanosecondArray,
col,
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 1302369..6d913ac 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -632,6 +632,7 @@ impl DefaultPhysicalPlanner {
let physical_input = self.create_initial_plan(input, ctx_state).await?;
let input_schema = physical_input.as_ref().schema();
let input_dfschema = input.as_ref().schema();
+
let runtime_expr = self.create_physical_expr(
predicate,
input_dfschema,
diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs
index 5eb29bb..e8898c1 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -29,7 +29,7 @@ use crate::physical_plan::{
};
pub use arrow::compute::SortOptions;
use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
-use arrow::datatypes::SchemaRef;
+use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, error::ArrowError};
@@ -201,6 +201,15 @@ fn sort_batch(
None,
)?;
+ let schema = Arc::new(Schema::new(
+ schema
+ .fields()
+ .iter()
+ .zip(batch.columns().iter().map(|col| col.data_type()))
+ .map(|(field, ty)| Field::new(field.name(), ty.clone(), field.is_nullable()))
+ .collect::<Vec<_>>(),
+ ));
+
// reorder all rows based on sorted indices
RecordBatch::try_new(
schema,
diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs
index 35ebb2a..cdcf11e 100644
--- a/datafusion/src/scalar.rs
+++ b/datafusion/src/scalar.rs
@@ -20,6 +20,7 @@
use crate::error::{DataFusionError, Result};
use arrow::{
array::*,
+ compute::kernels::cast::cast,
datatypes::{
ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, Float32Type,
Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalUnit, TimeUnit,
@@ -82,13 +83,13 @@ pub enum ScalarValue {
/// Date stored as a signed 64bit int
Date64(Option<i64>),
/// Timestamp Second
- TimestampSecond(Option<i64>),
+ TimestampSecond(Option<i64>, Option<String>),
/// Timestamp Milliseconds
- TimestampMillisecond(Option<i64>),
+ TimestampMillisecond(Option<i64>, Option<String>),
/// Timestamp Microseconds
- TimestampMicrosecond(Option<i64>),
+ TimestampMicrosecond(Option<i64>, Option<String>),
/// Timestamp Nanoseconds
- TimestampNanosecond(Option<i64>),
+ TimestampNanosecond(Option<i64>, Option<String>),
/// Interval with YearMonth unit
IntervalYearMonth(Option<i32>),
/// Interval with DayTime unit
@@ -155,14 +156,14 @@ impl PartialEq for ScalarValue {
(Date32(_), _) => false,
(Date64(v1), Date64(v2)) => v1.eq(v2),
(Date64(_), _) => false,
- (TimestampSecond(v1), TimestampSecond(v2)) => v1.eq(v2),
- (TimestampSecond(_), _) => false,
- (TimestampMillisecond(v1), TimestampMillisecond(v2)) => v1.eq(v2),
- (TimestampMillisecond(_), _) => false,
- (TimestampMicrosecond(v1), TimestampMicrosecond(v2)) => v1.eq(v2),
- (TimestampMicrosecond(_), _) => false,
- (TimestampNanosecond(v1), TimestampNanosecond(v2)) => v1.eq(v2),
- (TimestampNanosecond(_), _) => false,
+ (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.eq(v2),
+ (TimestampSecond(_, _), _) => false,
+ (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => v1.eq(v2),
+ (TimestampMillisecond(_, _), _) => false,
+ (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => v1.eq(v2),
+ (TimestampMicrosecond(_, _), _) => false,
+ (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => v1.eq(v2),
+ (TimestampNanosecond(_, _), _) => false,
(IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.eq(v2),
(IntervalYearMonth(_), _) => false,
(IntervalDayTime(v1), IntervalDayTime(v2)) => v1.eq(v2),
@@ -241,14 +242,20 @@ impl PartialOrd for ScalarValue {
(Date32(_), _) => None,
(Date64(v1), Date64(v2)) => v1.partial_cmp(v2),
(Date64(_), _) => None,
- (TimestampSecond(v1), TimestampSecond(v2)) => v1.partial_cmp(v2),
- (TimestampSecond(_), _) => None,
- (TimestampMillisecond(v1), TimestampMillisecond(v2)) => v1.partial_cmp(v2),
- (TimestampMillisecond(_), _) => None,
- (TimestampMicrosecond(v1), TimestampMicrosecond(v2)) => v1.partial_cmp(v2),
- (TimestampMicrosecond(_), _) => None,
- (TimestampNanosecond(v1), TimestampNanosecond(v2)) => v1.partial_cmp(v2),
- (TimestampNanosecond(_), _) => None,
+ (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.partial_cmp(v2),
+ (TimestampSecond(_, _), _) => None,
+ (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => {
+ v1.partial_cmp(v2)
+ }
+ (TimestampMillisecond(_, _), _) => None,
+ (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => {
+ v1.partial_cmp(v2)
+ }
+ (TimestampMicrosecond(_, _), _) => None,
+ (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => {
+ v1.partial_cmp(v2)
+ }
+ (TimestampNanosecond(_, _), _) => None,
(IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.partial_cmp(v2),
(IntervalYearMonth(_), _) => None,
(IntervalDayTime(v1), IntervalDayTime(v2)) => v1.partial_cmp(v2),
@@ -305,10 +312,10 @@ impl std::hash::Hash for ScalarValue {
}
Date32(v) => v.hash(state),
Date64(v) => v.hash(state),
- TimestampSecond(v) => v.hash(state),
- TimestampMillisecond(v) => v.hash(state),
- TimestampMicrosecond(v) => v.hash(state),
- TimestampNanosecond(v) => v.hash(state),
+ TimestampSecond(v, _) => v.hash(state),
+ TimestampMillisecond(v, _) => v.hash(state),
+ TimestampMicrosecond(v, _) => v.hash(state),
+ TimestampNanosecond(v, _) => v.hash(state),
IntervalYearMonth(v) => v.hash(state),
IntervalDayTime(v) => v.hash(state),
Struct(v, t) => {
@@ -344,6 +351,19 @@ fn get_dict_value<K: ArrowDictionaryKeyType>(
Ok((dict_array.values(), Some(values_index)))
}
+macro_rules! typed_cast_tz {
+ ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident, $TZ:expr) => {{
+ let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+ ScalarValue::$SCALAR(
+ match array.is_null($index) {
+ true => None,
+ false => Some(array.value($index).into()),
+ },
+ $TZ.clone(),
+ )
+ }};
+}
+
macro_rules! typed_cast {
($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident) => {{
let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
@@ -392,25 +412,25 @@ macro_rules! build_timestamp_list {
Some(values) => {
let values = values.as_ref();
match $TIME_UNIT {
- TimeUnit::Second => build_values_list!(
+ TimeUnit::Second => build_values_list_tz!(
TimestampSecondBuilder,
TimestampSecond,
values,
$SIZE
),
- TimeUnit::Microsecond => build_values_list!(
+ TimeUnit::Microsecond => build_values_list_tz!(
TimestampMillisecondBuilder,
TimestampMillisecond,
values,
$SIZE
),
- TimeUnit::Millisecond => build_values_list!(
+ TimeUnit::Millisecond => build_values_list_tz!(
TimestampMicrosecondBuilder,
TimestampMicrosecond,
values,
$SIZE
),
- TimeUnit::Nanosecond => build_values_list!(
+ TimeUnit::Nanosecond => build_values_list_tz!(
TimestampNanosecondBuilder,
TimestampNanosecond,
values,
@@ -445,6 +465,29 @@ macro_rules! build_values_list {
}};
}
+macro_rules! build_values_list_tz {
+ ($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{
+ let mut builder = ListBuilder::new($VALUE_BUILDER_TY::new($VALUES.len()));
+
+ for _ in 0..$SIZE {
+ for scalar_value in $VALUES {
+ match scalar_value {
+ ScalarValue::$SCALAR_TY(Some(v), _) => {
+ builder.values().append_value(v.clone()).unwrap()
+ }
+ ScalarValue::$SCALAR_TY(None, _) => {
+ builder.values().append_null().unwrap();
+ }
+ _ => panic!("Incompatible ScalarValue for list"),
+ };
+ }
+ builder.append(true).unwrap();
+ }
+
+ builder.finish()
+ }};
+}
+
macro_rules! build_array_from_option {
($DATA_TYPE:ident, $ARRAY_TYPE:ident, $EXPR:expr, $SIZE:expr) => {{
match $EXPR {
@@ -460,7 +503,12 @@ macro_rules! build_array_from_option {
}};
($DATA_TYPE:ident, $ENUM:expr, $ENUM2:expr, $ARRAY_TYPE:ident, $EXPR:expr, $SIZE:expr) => {{
match $EXPR {
- Some(value) => Arc::new($ARRAY_TYPE::from_value(*value, $SIZE)),
+ Some(value) => {
+ let array: ArrayRef = Arc::new($ARRAY_TYPE::from_value(*value, $SIZE));
+ // Need to call cast to cast to final data type with timezone/extra param
+ cast(&array, &DataType::$DATA_TYPE($ENUM, $ENUM2))
+ .expect("cannot do temporal cast")
+ }
None => new_null_array(&DataType::$DATA_TYPE($ENUM, $ENUM2), $SIZE),
}
}};
@@ -508,17 +556,17 @@ impl ScalarValue {
ScalarValue::Decimal128(_, precision, scale) => {
DataType::Decimal(*precision, *scale)
}
- ScalarValue::TimestampSecond(_) => {
- DataType::Timestamp(TimeUnit::Second, None)
+ ScalarValue::TimestampSecond(_, tz_opt) => {
+ DataType::Timestamp(TimeUnit::Second, tz_opt.clone())
}
- ScalarValue::TimestampMillisecond(_) => {
- DataType::Timestamp(TimeUnit::Millisecond, None)
+ ScalarValue::TimestampMillisecond(_, tz_opt) => {
+ DataType::Timestamp(TimeUnit::Millisecond, tz_opt.clone())
}
- ScalarValue::TimestampMicrosecond(_) => {
- DataType::Timestamp(TimeUnit::Microsecond, None)
+ ScalarValue::TimestampMicrosecond(_, tz_opt) => {
+ DataType::Timestamp(TimeUnit::Microsecond, tz_opt.clone())
}
- ScalarValue::TimestampNanosecond(_) => {
- DataType::Timestamp(TimeUnit::Nanosecond, None)
+ ScalarValue::TimestampNanosecond(_, tz_opt) => {
+ DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
}
ScalarValue::Float32(_) => DataType::Float32,
ScalarValue::Float64(_) => DataType::Float64,
@@ -583,9 +631,10 @@ impl ScalarValue {
| ScalarValue::Utf8(None)
| ScalarValue::LargeUtf8(None)
| ScalarValue::List(None, _)
- | ScalarValue::TimestampMillisecond(None)
- | ScalarValue::TimestampMicrosecond(None)
- | ScalarValue::TimestampNanosecond(None)
+ | ScalarValue::TimestampSecond(None, _)
+ | ScalarValue::TimestampMillisecond(None, _)
+ | ScalarValue::TimestampMicrosecond(None, _)
+ | ScalarValue::TimestampNanosecond(None, _)
| ScalarValue::Struct(None, _)
| ScalarValue::Decimal128(None, _, _) // For decimal type, the value is null means ScalarValue::Decimal128 is null.
)
@@ -666,6 +715,28 @@ impl ScalarValue {
}};
}
+ macro_rules! build_array_primitive_tz {
+ ($ARRAY_TY:ident, $SCALAR_TY:ident) => {{
+ {
+ let array = scalars
+ .map(|sv| {
+ if let ScalarValue::$SCALAR_TY(v, _) = sv {
+ Ok(v)
+ } else {
+ Err(DataFusionError::Internal(format!(
+ "Inconsistent types in ScalarValue::iter_to_array. \
+ Expected {:?}, got {:?}",
+ data_type, sv
+ )))
+ }
+ })
+ .collect::<Result<$ARRAY_TY>>()?;
+
+ Arc::new(array)
+ }
+ }};
+ }
+
/// Creates an array of $ARRAY_TY by unpacking values of
/// SCALAR_TY for "string-like" types.
macro_rules! build_array_string {
@@ -775,17 +846,17 @@ impl ScalarValue {
DataType::LargeBinary => build_array_string!(LargeBinaryArray, LargeBinary),
DataType::Date32 => build_array_primitive!(Date32Array, Date32),
DataType::Date64 => build_array_primitive!(Date64Array, Date64),
- DataType::Timestamp(TimeUnit::Second, None) => {
- build_array_primitive!(TimestampSecondArray, TimestampSecond)
+ DataType::Timestamp(TimeUnit::Second, _) => {
+ build_array_primitive_tz!(TimestampSecondArray, TimestampSecond)
}
- DataType::Timestamp(TimeUnit::Millisecond, None) => {
- build_array_primitive!(TimestampMillisecondArray, TimestampMillisecond)
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
+ build_array_primitive_tz!(TimestampMillisecondArray, TimestampMillisecond)
}
- DataType::Timestamp(TimeUnit::Microsecond, None) => {
- build_array_primitive!(TimestampMicrosecondArray, TimestampMicrosecond)
+ DataType::Timestamp(TimeUnit::Microsecond, _) => {
+ build_array_primitive_tz!(TimestampMicrosecondArray, TimestampMicrosecond)
}
- DataType::Timestamp(TimeUnit::Nanosecond, None) => {
- build_array_primitive!(TimestampNanosecondArray, TimestampNanosecond)
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+ build_array_primitive_tz!(TimestampNanosecondArray, TimestampNanosecond)
}
DataType::Interval(IntervalUnit::DayTime) => {
build_array_primitive!(IntervalDayTimeArray, IntervalDayTime)
@@ -1036,35 +1107,35 @@ impl ScalarValue {
ScalarValue::UInt64(e) => {
build_array_from_option!(UInt64, UInt64Array, e, size)
}
- ScalarValue::TimestampSecond(e) => build_array_from_option!(
+ ScalarValue::TimestampSecond(e, tz_opt) => build_array_from_option!(
Timestamp,
TimeUnit::Second,
- None,
+ tz_opt.clone(),
TimestampSecondArray,
e,
size
),
- ScalarValue::TimestampMillisecond(e) => build_array_from_option!(
+ ScalarValue::TimestampMillisecond(e, tz_opt) => build_array_from_option!(
Timestamp,
TimeUnit::Millisecond,
- None,
+ tz_opt.clone(),
TimestampMillisecondArray,
e,
size
),
- ScalarValue::TimestampMicrosecond(e) => build_array_from_option!(
+ ScalarValue::TimestampMicrosecond(e, tz_opt) => build_array_from_option!(
Timestamp,
TimeUnit::Microsecond,
- None,
+ tz_opt.clone(),
TimestampMicrosecondArray,
e,
size
),
- ScalarValue::TimestampNanosecond(e) => build_array_from_option!(
+ ScalarValue::TimestampNanosecond(e, tz_opt) => build_array_from_option!(
Timestamp,
TimeUnit::Nanosecond,
- None,
+ tz_opt.clone(),
TimestampNanosecondArray,
e,
size
@@ -1251,27 +1322,41 @@ impl ScalarValue {
DataType::Date64 => {
typed_cast!(array, index, Date64Array, Date64)
}
- DataType::Timestamp(TimeUnit::Second, _) => {
- typed_cast!(array, index, TimestampSecondArray, TimestampSecond)
+ DataType::Timestamp(TimeUnit::Second, tz_opt) => {
+ typed_cast_tz!(
+ array,
+ index,
+ TimestampSecondArray,
+ TimestampSecond,
+ tz_opt
+ )
}
- DataType::Timestamp(TimeUnit::Millisecond, _) => {
- typed_cast!(
+ DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
+ typed_cast_tz!(
array,
index,
TimestampMillisecondArray,
- TimestampMillisecond
+ TimestampMillisecond,
+ tz_opt
)
}
- DataType::Timestamp(TimeUnit::Microsecond, _) => {
- typed_cast!(
+ DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
+ typed_cast_tz!(
array,
index,
TimestampMicrosecondArray,
- TimestampMicrosecond
+ TimestampMicrosecond,
+ tz_opt
)
}
- DataType::Timestamp(TimeUnit::Nanosecond, _) => {
- typed_cast!(array, index, TimestampNanosecondArray, TimestampNanosecond)
+ DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
+ typed_cast_tz!(
+ array,
+ index,
+ TimestampNanosecondArray,
+ TimestampNanosecond,
+ tz_opt
+ )
}
DataType::Dictionary(index_type, _) => {
let (values, values_index) = match **index_type {
@@ -1407,16 +1492,16 @@ impl ScalarValue {
ScalarValue::Date64(val) => {
eq_array_primitive!(array, index, Date64Array, val)
}
- ScalarValue::TimestampSecond(val) => {
+ ScalarValue::TimestampSecond(val, _) => {
eq_array_primitive!(array, index, TimestampSecondArray, val)
}
- ScalarValue::TimestampMillisecond(val) => {
+ ScalarValue::TimestampMillisecond(val, _) => {
eq_array_primitive!(array, index, TimestampMillisecondArray, val)
}
- ScalarValue::TimestampMicrosecond(val) => {
+ ScalarValue::TimestampMicrosecond(val, _) => {
eq_array_primitive!(array, index, TimestampMicrosecondArray, val)
}
- ScalarValue::TimestampNanosecond(val) => {
+ ScalarValue::TimestampNanosecond(val, _) => {
eq_array_primitive!(array, index, TimestampNanosecondArray, val)
}
ScalarValue::IntervalYearMonth(val) => {
@@ -1565,10 +1650,10 @@ impl TryFrom<ScalarValue> for i64 {
match value {
ScalarValue::Int64(Some(inner_value))
| ScalarValue::Date64(Some(inner_value))
- | ScalarValue::TimestampNanosecond(Some(inner_value))
- | ScalarValue::TimestampMicrosecond(Some(inner_value))
- | ScalarValue::TimestampMillisecond(Some(inner_value))
- | ScalarValue::TimestampSecond(Some(inner_value)) => Ok(inner_value),
+ | ScalarValue::TimestampNanosecond(Some(inner_value), _)
+ | ScalarValue::TimestampMicrosecond(Some(inner_value), _)
+ | ScalarValue::TimestampMillisecond(Some(inner_value), _)
+ | ScalarValue::TimestampSecond(Some(inner_value), _) => Ok(inner_value),
_ => Err(DataFusionError::Internal(format!(
"Cannot convert {:?} to {}",
value,
@@ -1610,17 +1695,17 @@ impl TryFrom<&DataType> for ScalarValue {
DataType::LargeUtf8 => ScalarValue::LargeUtf8(None),
DataType::Date32 => ScalarValue::Date32(None),
DataType::Date64 => ScalarValue::Date64(None),
- DataType::Timestamp(TimeUnit::Second, _) => {
- ScalarValue::TimestampSecond(None)
+ DataType::Timestamp(TimeUnit::Second, tz_opt) => {
+ ScalarValue::TimestampSecond(None, tz_opt.clone())
}
- DataType::Timestamp(TimeUnit::Millisecond, _) => {
- ScalarValue::TimestampMillisecond(None)
+ DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
+ ScalarValue::TimestampMillisecond(None, tz_opt.clone())
}
- DataType::Timestamp(TimeUnit::Microsecond, _) => {
- ScalarValue::TimestampMicrosecond(None)
+ DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
+ ScalarValue::TimestampMicrosecond(None, tz_opt.clone())
}
- DataType::Timestamp(TimeUnit::Nanosecond, _) => {
- ScalarValue::TimestampNanosecond(None)
+ DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
+ ScalarValue::TimestampNanosecond(None, tz_opt.clone())
}
DataType::Dictionary(_index_type, value_type) => {
value_type.as_ref().try_into()?
@@ -1667,10 +1752,10 @@ impl fmt::Display for ScalarValue {
ScalarValue::UInt16(e) => format_option!(f, e)?,
ScalarValue::UInt32(e) => format_option!(f, e)?,
ScalarValue::UInt64(e) => format_option!(f, e)?,
- ScalarValue::TimestampSecond(e) => format_option!(f, e)?,
- ScalarValue::TimestampMillisecond(e) => format_option!(f, e)?,
- ScalarValue::TimestampMicrosecond(e) => format_option!(f, e)?,
- ScalarValue::TimestampNanosecond(e) => format_option!(f, e)?,
+ ScalarValue::TimestampSecond(e, _) => format_option!(f, e)?,
+ ScalarValue::TimestampMillisecond(e, _) => format_option!(f, e)?,
+ ScalarValue::TimestampMicrosecond(e, _) => format_option!(f, e)?,
+ ScalarValue::TimestampNanosecond(e, _) => format_option!(f, e)?,
ScalarValue::Utf8(e) => format_option!(f, e)?,
ScalarValue::LargeUtf8(e) => format_option!(f, e)?,
ScalarValue::Binary(e) => match e {
@@ -1742,15 +1827,17 @@ impl fmt::Debug for ScalarValue {
ScalarValue::UInt16(_) => write!(f, "UInt16({})", self),
ScalarValue::UInt32(_) => write!(f, "UInt32({})", self),
ScalarValue::UInt64(_) => write!(f, "UInt64({})", self),
- ScalarValue::TimestampSecond(_) => write!(f, "TimestampSecond({})", self),
- ScalarValue::TimestampMillisecond(_) => {
- write!(f, "TimestampMillisecond({})", self)
+ ScalarValue::TimestampSecond(_, tz_opt) => {
+ write!(f, "TimestampSecond({}, {:?})", self, tz_opt)
}
- ScalarValue::TimestampMicrosecond(_) => {
- write!(f, "TimestampMicrosecond({})", self)
+ ScalarValue::TimestampMillisecond(_, tz_opt) => {
+ write!(f, "TimestampMillisecond({}, {:?})", self, tz_opt)
}
- ScalarValue::TimestampNanosecond(_) => {
- write!(f, "TimestampNanosecond({})", self)
+ ScalarValue::TimestampMicrosecond(_, tz_opt) => {
+ write!(f, "TimestampMicrosecond({}, {:?})", self, tz_opt)
+ }
+ ScalarValue::TimestampNanosecond(_, tz_opt) => {
+ write!(f, "TimestampNanosecond({}, {:?})", self, tz_opt)
}
ScalarValue::Utf8(None) => write!(f, "Utf8({})", self),
ScalarValue::Utf8(Some(_)) => write!(f, "Utf8(\"{}\")", self),
@@ -1802,25 +1889,25 @@ impl ScalarType<f32> for Float32Type {
impl ScalarType<i64> for TimestampSecondType {
fn scalar(r: Option<i64>) -> ScalarValue {
- ScalarValue::TimestampSecond(r)
+ ScalarValue::TimestampSecond(r, None)
}
}
impl ScalarType<i64> for TimestampMillisecondType {
fn scalar(r: Option<i64>) -> ScalarValue {
- ScalarValue::TimestampMillisecond(r)
+ ScalarValue::TimestampMillisecond(r, None)
}
}
impl ScalarType<i64> for TimestampMicrosecondType {
fn scalar(r: Option<i64>) -> ScalarValue {
- ScalarValue::TimestampMicrosecond(r)
+ ScalarValue::TimestampMicrosecond(r, None)
}
}
impl ScalarType<i64> for TimestampNanosecondType {
fn scalar(r: Option<i64>) -> ScalarValue {
- ScalarValue::TimestampNanosecond(r)
+ ScalarValue::TimestampNanosecond(r, None)
}
}
@@ -2007,6 +2094,23 @@ mod tests {
}};
}
+ /// Creates array directly and via ScalarValue and ensures they are the same
+ /// but for variants that carry a timezone field.
+ macro_rules! check_scalar_iter_tz {
+ ($SCALAR_T:ident, $ARRAYTYPE:ident, $INPUT:expr) => {{
+ let scalars: Vec<_> = $INPUT
+ .iter()
+ .map(|v| ScalarValue::$SCALAR_T(*v, None))
+ .collect();
+
+ let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap();
+
+ let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT));
+
+ assert_eq!(&array, &expected);
+ }};
+ }
+
/// Creates array directly and via ScalarValue and ensures they
/// are the same, for string arrays
macro_rules! check_scalar_iter_string {
@@ -2060,22 +2164,22 @@ mod tests {
check_scalar_iter!(UInt32, UInt32Array, vec![Some(1), None, Some(3)]);
check_scalar_iter!(UInt64, UInt64Array, vec![Some(1), None, Some(3)]);
- check_scalar_iter!(
+ check_scalar_iter_tz!(
TimestampSecond,
TimestampSecondArray,
vec![Some(1), None, Some(3)]
);
- check_scalar_iter!(
+ check_scalar_iter_tz!(
TimestampMillisecond,
TimestampMillisecondArray,
vec![Some(1), None, Some(3)]
);
- check_scalar_iter!(
+ check_scalar_iter_tz!(
TimestampMicrosecond,
TimestampMicrosecondArray,
vec![Some(1), None, Some(3)]
);
- check_scalar_iter!(
+ check_scalar_iter_tz!(
TimestampNanosecond,
TimestampNanosecondArray,
vec![Some(1), None, Some(3)]
@@ -2156,6 +2260,10 @@ mod tests {
// Since ScalarValues are used in a non trivial number of places,
// making it larger means significant more memory consumption
// per distinct value.
+ #[cfg(target_arch = "aarch64")]
+ assert_eq!(std::mem::size_of::<ScalarValue>(), 64);
+
+ #[cfg(target_arch = "amd64")]
assert_eq!(std::mem::size_of::<ScalarValue>(), 48);
}
@@ -2203,6 +2311,17 @@ mod tests {
scalars: $INPUT.iter().map(|v| ScalarValue::$SCALAR_TY(*v)).collect(),
}
}};
+
+ ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident, $TZ:expr) => {{
+ let tz = $TZ;
+ TestCase {
+ array: Arc::new($INPUT.iter().collect::<$ARRAY_TY>()),
+ scalars: $INPUT
+ .iter()
+ .map(|v| ScalarValue::$SCALAR_TY(*v, tz.clone()))
+ .collect(),
+ }
+ }};
}
macro_rules! make_str_test_case {
@@ -2267,10 +2386,49 @@ mod tests {
make_binary_test_case!(str_vals, LargeBinaryArray, LargeBinary),
make_test_case!(i32_vals, Date32Array, Date32),
make_test_case!(i64_vals, Date64Array, Date64),
- make_test_case!(i64_vals, TimestampSecondArray, TimestampSecond),
- make_test_case!(i64_vals, TimestampMillisecondArray, TimestampMillisecond),
- make_test_case!(i64_vals, TimestampMicrosecondArray, TimestampMicrosecond),
- make_test_case!(i64_vals, TimestampNanosecondArray, TimestampNanosecond),
+ make_test_case!(i64_vals, TimestampSecondArray, TimestampSecond, None),
+ make_test_case!(
+ i64_vals,
+ TimestampSecondArray,
+ TimestampSecond,
+ Some("UTC".to_owned())
+ ),
+ make_test_case!(
+ i64_vals,
+ TimestampMillisecondArray,
+ TimestampMillisecond,
+ None
+ ),
+ make_test_case!(
+ i64_vals,
+ TimestampMillisecondArray,
+ TimestampMillisecond,
+ Some("UTC".to_owned())
+ ),
+ make_test_case!(
+ i64_vals,
+ TimestampMicrosecondArray,
+ TimestampMicrosecond,
+ None
+ ),
+ make_test_case!(
+ i64_vals,
+ TimestampMicrosecondArray,
+ TimestampMicrosecond,
+ Some("UTC".to_owned())
+ ),
+ make_test_case!(
+ i64_vals,
+ TimestampNanosecondArray,
+ TimestampNanosecond,
+ None
+ ),
+ make_test_case!(
+ i64_vals,
+ TimestampNanosecondArray,
+ TimestampNanosecond,
+ Some("UTC".to_owned())
+ ),
make_test_case!(i32_vals, IntervalYearMonthArray, IntervalYearMonth),
make_test_case!(i64_vals, IntervalDayTimeArray, IntervalDayTime),
make_str_dict_test_case!(str_vals, Int8Type, Utf8),
@@ -2897,4 +3055,30 @@ mod tests {
assert_eq!(array, &expected);
}
+
+ #[test]
+ fn scalar_timestamp_ns_utc_timezone() {
+ let scalar = ScalarValue::TimestampNanosecond(
+ Some(1599566400000000000),
+ Some("UTC".to_owned()),
+ );
+
+ assert_eq!(
+ scalar.get_datatype(),
+ DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
+ );
+
+ let array = scalar.to_array();
+ assert_eq!(array.len(), 1);
+ assert_eq!(
+ array.data_type(),
+ &DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
+ );
+
+ let newscalar = ScalarValue::try_from_array(&array, 0).unwrap();
+ assert_eq!(
+ newscalar.get_datatype(),
+ DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
+ );
+ }
}
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index b72606f..7c3210d 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -4107,34 +4107,41 @@ fn make_timestamp_table<A>() -> Result<Arc<MemTable>>
where
A: ArrowTimestampType,
{
+ make_timestamp_tz_table::<A>(None)
+}
+
+fn make_timestamp_tz_table<A>(tz: Option<String>) -> Result<Arc<MemTable>>
+where
+ A: ArrowTimestampType,
+{
let schema = Arc::new(Schema::new(vec![
- Field::new("ts", DataType::Timestamp(A::get_time_unit(), None), false),
+ Field::new(
+ "ts",
+ DataType::Timestamp(A::get_time_unit(), tz.clone()),
+ false,
+ ),
Field::new("value", DataType::Int32, true),
]));
- let mut builder = PrimitiveBuilder::<A>::new(3);
-
- let nanotimestamps = vec![
- 1599572549190855000i64, // 2020-09-08T13:42:29.190855+00:00
- 1599568949190855000, // 2020-09-08T12:42:29.190855+00:00
- 1599565349190855000, //2020-09-08T11:42:29.190855+00:00
- ]; // 2020-09-08T11:42:29.190855+00:00
let divisor = match A::get_time_unit() {
TimeUnit::Nanosecond => 1,
TimeUnit::Microsecond => 1000,
TimeUnit::Millisecond => 1_000_000,
TimeUnit::Second => 1_000_000_000,
};
- for ts in nanotimestamps {
- builder.append_value(
- <A as ArrowPrimitiveType>::Native::from_i64(ts / divisor).unwrap(),
- )?;
- }
+
+ let timestamps = vec![
+ 1599572549190855000i64 / divisor, // 2020-09-08T13:42:29.190855+00:00
+ 1599568949190855000 / divisor, // 2020-09-08T12:42:29.190855+00:00
+ 1599565349190855000 / divisor, //2020-09-08T11:42:29.190855+00:00
+ ]; // 2020-09-08T11:42:29.190855+00:00
+
+ let array = PrimitiveArray::<A>::from_vec(timestamps, tz);
let data = RecordBatch::try_new(
schema.clone(),
vec![
- Arc::new(builder.finish()),
+ Arc::new(array),
Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
],
)?;
@@ -6615,3 +6622,357 @@ async fn csv_query_with_decimal_by_sql() -> Result<()> {
assert_batches_eq!(expected, &actual);
Ok(())
}
+
+#[tokio::test]
+async fn timestamp_minmax() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ let table_a = make_timestamp_tz_table::<TimestampMillisecondType>(None)?;
+ let table_b =
+ make_timestamp_tz_table::<TimestampNanosecondType>(Some("UTC".to_owned()))?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT MIN(table_a.ts), MAX(table_b.ts) FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+-------------------------+----------------------------+",
+ "| MIN(table_a.ts) | MAX(table_b.ts) |",
+ "+-------------------------+----------------------------+",
+ "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29.190855 |",
+ "+-------------------------+----------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn timestamp_coercion() -> Result<()> {
+ {
+ let mut ctx = ExecutionContext::new();
+ let table_a =
+ make_timestamp_tz_table::<TimestampSecondType>(Some("UTC".to_owned()))?;
+ let table_b =
+ make_timestamp_tz_table::<TimestampMillisecondType>(Some("UTC".to_owned()))?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------------------+-------------------------+--------------------------+",
+ "| ts | ts | table_a.ts Eq table_b.ts |",
+ "+---------------------+-------------------------+--------------------------+",
+ "| 2020-09-08 13:42:29 | 2020-09-08 13:42:29.190 | true |",
+ "| 2020-09-08 13:42:29 | 2020-09-08 12:42:29.190 | false |",
+ "| 2020-09-08 13:42:29 | 2020-09-08 11:42:29.190 | false |",
+ "| 2020-09-08 12:42:29 | 2020-09-08 13:42:29.190 | false |",
+ "| 2020-09-08 12:42:29 | 2020-09-08 12:42:29.190 | true |",
+ "| 2020-09-08 12:42:29 | 2020-09-08 11:42:29.190 | false |",
+ "| 2020-09-08 11:42:29 | 2020-09-08 13:42:29.190 | false |",
+ "| 2020-09-08 11:42:29 | 2020-09-08 12:42:29.190 | false |",
+ "| 2020-09-08 11:42:29 | 2020-09-08 11:42:29.190 | true |",
+ "+---------------------+-------------------------+--------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ {
+ let mut ctx = ExecutionContext::new();
+ let table_a = make_timestamp_table::<TimestampSecondType>()?;
+ let table_b = make_timestamp_table::<TimestampMicrosecondType>()?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------------------+----------------------------+--------------------------+",
+ "| ts | ts | table_a.ts Eq table_b.ts |",
+ "+---------------------+----------------------------+--------------------------+",
+ "| 2020-09-08 13:42:29 | 2020-09-08 13:42:29.190855 | true |",
+ "| 2020-09-08 13:42:29 | 2020-09-08 12:42:29.190855 | false |",
+ "| 2020-09-08 13:42:29 | 2020-09-08 11:42:29.190855 | false |",
+ "| 2020-09-08 12:42:29 | 2020-09-08 13:42:29.190855 | false |",
+ "| 2020-09-08 12:42:29 | 2020-09-08 12:42:29.190855 | true |",
+ "| 2020-09-08 12:42:29 | 2020-09-08 11:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29 | 2020-09-08 13:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29 | 2020-09-08 12:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29 | 2020-09-08 11:42:29.190855 | true |",
+ "+---------------------+----------------------------+--------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ {
+ let mut ctx = ExecutionContext::new();
+ let table_a = make_timestamp_table::<TimestampSecondType>()?;
+ let table_b = make_timestamp_table::<TimestampNanosecondType>()?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------------------+----------------------------+--------------------------+",
+ "| ts | ts | table_a.ts Eq table_b.ts |",
+ "+---------------------+----------------------------+--------------------------+",
+ "| 2020-09-08 13:42:29 | 2020-09-08 13:42:29.190855 | true |",
+ "| 2020-09-08 13:42:29 | 2020-09-08 12:42:29.190855 | false |",
+ "| 2020-09-08 13:42:29 | 2020-09-08 11:42:29.190855 | false |",
+ "| 2020-09-08 12:42:29 | 2020-09-08 13:42:29.190855 | false |",
+ "| 2020-09-08 12:42:29 | 2020-09-08 12:42:29.190855 | true |",
+ "| 2020-09-08 12:42:29 | 2020-09-08 11:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29 | 2020-09-08 13:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29 | 2020-09-08 12:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29 | 2020-09-08 11:42:29.190855 | true |",
+ "+---------------------+----------------------------+--------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ {
+ let mut ctx = ExecutionContext::new();
+ let table_a = make_timestamp_table::<TimestampMillisecondType>()?;
+ let table_b = make_timestamp_table::<TimestampSecondType>()?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+-------------------------+---------------------+--------------------------+",
+ "| ts | ts | table_a.ts Eq table_b.ts |",
+ "+-------------------------+---------------------+--------------------------+",
+ "| 2020-09-08 13:42:29.190 | 2020-09-08 13:42:29 | true |",
+ "| 2020-09-08 13:42:29.190 | 2020-09-08 12:42:29 | false |",
+ "| 2020-09-08 13:42:29.190 | 2020-09-08 11:42:29 | false |",
+ "| 2020-09-08 12:42:29.190 | 2020-09-08 13:42:29 | false |",
+ "| 2020-09-08 12:42:29.190 | 2020-09-08 12:42:29 | true |",
+ "| 2020-09-08 12:42:29.190 | 2020-09-08 11:42:29 | false |",
+ "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29 | false |",
+ "| 2020-09-08 11:42:29.190 | 2020-09-08 12:42:29 | false |",
+ "| 2020-09-08 11:42:29.190 | 2020-09-08 11:42:29 | true |",
+ "+-------------------------+---------------------+--------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ {
+ let mut ctx = ExecutionContext::new();
+ let table_a = make_timestamp_table::<TimestampMillisecondType>()?;
+ let table_b = make_timestamp_table::<TimestampMicrosecondType>()?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+-------------------------+----------------------------+--------------------------+",
+ "| ts | ts | table_a.ts Eq table_b.ts |",
+ "+-------------------------+----------------------------+--------------------------+",
+ "| 2020-09-08 13:42:29.190 | 2020-09-08 13:42:29.190855 | true |",
+ "| 2020-09-08 13:42:29.190 | 2020-09-08 12:42:29.190855 | false |",
+ "| 2020-09-08 13:42:29.190 | 2020-09-08 11:42:29.190855 | false |",
+ "| 2020-09-08 12:42:29.190 | 2020-09-08 13:42:29.190855 | false |",
+ "| 2020-09-08 12:42:29.190 | 2020-09-08 12:42:29.190855 | true |",
+ "| 2020-09-08 12:42:29.190 | 2020-09-08 11:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29.190 | 2020-09-08 12:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29.190 | 2020-09-08 11:42:29.190855 | true |",
+ "+-------------------------+----------------------------+--------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ {
+ let mut ctx = ExecutionContext::new();
+ let table_a = make_timestamp_table::<TimestampMillisecondType>()?;
+ let table_b = make_timestamp_table::<TimestampNanosecondType>()?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+-------------------------+----------------------------+--------------------------+",
+ "| ts | ts | table_a.ts Eq table_b.ts |",
+ "+-------------------------+----------------------------+--------------------------+",
+ "| 2020-09-08 13:42:29.190 | 2020-09-08 13:42:29.190855 | true |",
+ "| 2020-09-08 13:42:29.190 | 2020-09-08 12:42:29.190855 | false |",
+ "| 2020-09-08 13:42:29.190 | 2020-09-08 11:42:29.190855 | false |",
+ "| 2020-09-08 12:42:29.190 | 2020-09-08 13:42:29.190855 | false |",
+ "| 2020-09-08 12:42:29.190 | 2020-09-08 12:42:29.190855 | true |",
+ "| 2020-09-08 12:42:29.190 | 2020-09-08 11:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29.190 | 2020-09-08 12:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29.190 | 2020-09-08 11:42:29.190855 | true |",
+ "+-------------------------+----------------------------+--------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ {
+ let mut ctx = ExecutionContext::new();
+ let table_a = make_timestamp_table::<TimestampMicrosecondType>()?;
+ let table_b = make_timestamp_table::<TimestampSecondType>()?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+----------------------------+---------------------+--------------------------+",
+ "| ts | ts | table_a.ts Eq table_b.ts |",
+ "+----------------------------+---------------------+--------------------------+",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29 | true |",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29 | false |",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29 | false |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29 | false |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29 | true |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29 | true |",
+ "+----------------------------+---------------------+--------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ {
+ let mut ctx = ExecutionContext::new();
+ let table_a = make_timestamp_table::<TimestampMicrosecondType>()?;
+ let table_b = make_timestamp_table::<TimestampMillisecondType>()?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+----------------------------+-------------------------+--------------------------+",
+ "| ts | ts | table_a.ts Eq table_b.ts |",
+ "+----------------------------+-------------------------+--------------------------+",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190 | true |",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190 | false |",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190 | false |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190 | false |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190 | true |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190 | true |",
+ "+----------------------------+-------------------------+--------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ {
+ let mut ctx = ExecutionContext::new();
+ let table_a = make_timestamp_table::<TimestampMicrosecondType>()?;
+ let table_b = make_timestamp_table::<TimestampNanosecondType>()?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+----------------------------+----------------------------+--------------------------+",
+ "| ts | ts | table_a.ts Eq table_b.ts |",
+ "+----------------------------+----------------------------+--------------------------+",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190855 | true |",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190855 | false |",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190855 | false |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190855 | false |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190855 | true |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190855 | true |",
+ "+----------------------------+----------------------------+--------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ {
+ let mut ctx = ExecutionContext::new();
+ let table_a = make_timestamp_table::<TimestampNanosecondType>()?;
+ let table_b = make_timestamp_table::<TimestampSecondType>()?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+----------------------------+---------------------+--------------------------+",
+ "| ts | ts | table_a.ts Eq table_b.ts |",
+ "+----------------------------+---------------------+--------------------------+",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29 | true |",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29 | false |",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29 | false |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29 | false |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29 | true |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29 | true |",
+ "+----------------------------+---------------------+--------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ {
+ let mut ctx = ExecutionContext::new();
+ let table_a = make_timestamp_table::<TimestampNanosecondType>()?;
+ let table_b = make_timestamp_table::<TimestampMillisecondType>()?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+----------------------------+-------------------------+--------------------------+",
+ "| ts | ts | table_a.ts Eq table_b.ts |",
+ "+----------------------------+-------------------------+--------------------------+",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190 | true |",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190 | false |",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190 | false |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190 | false |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190 | true |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190 | true |",
+ "+----------------------------+-------------------------+--------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ {
+ let mut ctx = ExecutionContext::new();
+ let table_a = make_timestamp_table::<TimestampNanosecondType>()?;
+ let table_b = make_timestamp_table::<TimestampMicrosecondType>()?;
+ ctx.register_table("table_a", table_a)?;
+ ctx.register_table("table_b", table_b)?;
+
+ let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+----------------------------+----------------------------+--------------------------+",
+ "| ts | ts | table_a.ts Eq table_b.ts |",
+ "+----------------------------+----------------------------+--------------------------+",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190855 | true |",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190855 | false |",
+ "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190855 | false |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190855 | false |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190855 | true |",
+ "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190855 | false |",
+ "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190855 | true |",
+ "+----------------------------+----------------------------+--------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ Ok(())
+}