You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/12/21 14:19:49 UTC

[arrow-datafusion] branch master updated: Add Timezone to Scalar::Time* types, and better timezone awareness to Datafusion's time types (#1455)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 5668be7  Add Timezone to Scalar::Time* types,   and better timezone awareness to Datafusion's time types (#1455)
5668be7 is described below

commit 5668be78a9ccbf5469e9e95ad070920ca5d105ba
Author: Max Burke <ma...@urbanlogiq.com>
AuthorDate: Tue Dec 21 06:19:41 2021 -0800

    Add Timezone to Scalar::Time* types,   and better timezone awareness to Datafusion's time types (#1455)
    
    * point to UL repos
    
    * Make ScalarValue::TimestampNanosecond moderately timezone aware
    
    * cargo fmt
    
    * fix ballista build
    
    * fix ballista tests
    
    * ScalarValue is only 64b on aarch64; it is still 48 on amd64
    
    * remove debugging code
    
    * add tests for timestamp coercion
    
    * minmax test on mixed ts types, allow creation of timestamp tables with a timezone, fix a missed case in the binary ops applied to timestamp types with timezones
---
 .../rust/core/src/serde/logical_plan/from_proto.rs |  20 +-
 ballista/rust/core/src/serde/logical_plan/mod.rs   |  18 +-
 .../rust/core/src/serde/logical_plan/to_proto.rs   |   4 +-
 datafusion/src/logical_plan/expr.rs                |  10 +-
 datafusion/src/optimizer/simplify_expressions.rs   |   4 +-
 .../src/physical_plan/datetime_expressions.rs      |   8 +-
 datafusion/src/physical_plan/expressions/binary.rs |  22 +-
 .../src/physical_plan/expressions/coercion.rs      |  37 ++
 .../src/physical_plan/expressions/min_max.rs       |  65 +++-
 datafusion/src/physical_plan/functions.rs          |   5 +-
 datafusion/src/physical_plan/hash_utils.rs         |   2 +-
 datafusion/src/physical_plan/planner.rs            |   1 +
 datafusion/src/physical_plan/sort.rs               |  11 +-
 datafusion/src/scalar.rs                           | 394 +++++++++++++++------
 datafusion/tests/sql.rs                            | 389 +++++++++++++++++++-
 15 files changed, 810 insertions(+), 180 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index ba40488..dfac547 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -494,10 +494,10 @@ fn typechecked_scalar_value_conversion(
             ScalarValue::Date32(Some(*v))
         }
         (Value::TimeMicrosecondValue(v), PrimitiveScalarType::TimeMicrosecond) => {
-            ScalarValue::TimestampMicrosecond(Some(*v))
+            ScalarValue::TimestampMicrosecond(Some(*v), None)
         }
         (Value::TimeNanosecondValue(v), PrimitiveScalarType::TimeMicrosecond) => {
-            ScalarValue::TimestampNanosecond(Some(*v))
+            ScalarValue::TimestampNanosecond(Some(*v), None)
         }
         (Value::Utf8Value(v), PrimitiveScalarType::Utf8) => {
             ScalarValue::Utf8(Some(v.to_owned()))
@@ -530,10 +530,10 @@ fn typechecked_scalar_value_conversion(
                     PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None),
                     PrimitiveScalarType::Date32 => ScalarValue::Date32(None),
                     PrimitiveScalarType::TimeMicrosecond => {
-                        ScalarValue::TimestampMicrosecond(None)
+                        ScalarValue::TimestampMicrosecond(None, None)
                     }
                     PrimitiveScalarType::TimeNanosecond => {
-                        ScalarValue::TimestampNanosecond(None)
+                        ScalarValue::TimestampNanosecond(None, None)
                     }
                     PrimitiveScalarType::Null => {
                         return Err(proto_error(
@@ -593,10 +593,10 @@ impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::scalar_value::Value
                 ScalarValue::Date32(Some(*v))
             }
             protobuf::scalar_value::Value::TimeMicrosecondValue(v) => {
-                ScalarValue::TimestampMicrosecond(Some(*v))
+                ScalarValue::TimestampMicrosecond(Some(*v), None)
             }
             protobuf::scalar_value::Value::TimeNanosecondValue(v) => {
-                ScalarValue::TimestampNanosecond(Some(*v))
+                ScalarValue::TimestampNanosecond(Some(*v), None)
             }
             protobuf::scalar_value::Value::ListValue(v) => v.try_into()?,
             protobuf::scalar_value::Value::NullListValue(v) => {
@@ -758,10 +758,10 @@ impl TryInto<datafusion::scalar::ScalarValue> for protobuf::PrimitiveScalarType
             protobuf::PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None),
             protobuf::PrimitiveScalarType::Date32 => ScalarValue::Date32(None),
             protobuf::PrimitiveScalarType::TimeMicrosecond => {
-                ScalarValue::TimestampMicrosecond(None)
+                ScalarValue::TimestampMicrosecond(None, None)
             }
             protobuf::PrimitiveScalarType::TimeNanosecond => {
-                ScalarValue::TimestampNanosecond(None)
+                ScalarValue::TimestampNanosecond(None, None)
             }
         })
     }
@@ -811,10 +811,10 @@ impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::ScalarValue {
                 ScalarValue::Date32(Some(*v))
             }
             protobuf::scalar_value::Value::TimeMicrosecondValue(v) => {
-                ScalarValue::TimestampMicrosecond(Some(*v))
+                ScalarValue::TimestampMicrosecond(Some(*v), None)
             }
             protobuf::scalar_value::Value::TimeNanosecondValue(v) => {
-                ScalarValue::TimestampNanosecond(Some(*v))
+                ScalarValue::TimestampNanosecond(Some(*v), None)
             }
             protobuf::scalar_value::Value::ListValue(scalar_list) => {
                 let protobuf::ScalarListValue {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index a5e2aa0..a0f481a 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -216,8 +216,8 @@ mod roundtrip_tests {
             ScalarValue::LargeUtf8(None),
             ScalarValue::List(None, Box::new(DataType::Boolean)),
             ScalarValue::Date32(None),
-            ScalarValue::TimestampMicrosecond(None),
-            ScalarValue::TimestampNanosecond(None),
+            ScalarValue::TimestampMicrosecond(None, None),
+            ScalarValue::TimestampNanosecond(None, None),
             ScalarValue::Boolean(Some(true)),
             ScalarValue::Boolean(Some(false)),
             ScalarValue::Float32(Some(1.0)),
@@ -256,11 +256,11 @@ mod roundtrip_tests {
             ScalarValue::LargeUtf8(Some(String::from("Test Large utf8"))),
             ScalarValue::Date32(Some(0)),
             ScalarValue::Date32(Some(i32::MAX)),
-            ScalarValue::TimestampNanosecond(Some(0)),
-            ScalarValue::TimestampNanosecond(Some(i64::MAX)),
-            ScalarValue::TimestampMicrosecond(Some(0)),
-            ScalarValue::TimestampMicrosecond(Some(i64::MAX)),
-            ScalarValue::TimestampMicrosecond(None),
+            ScalarValue::TimestampNanosecond(Some(0), None),
+            ScalarValue::TimestampNanosecond(Some(i64::MAX), None),
+            ScalarValue::TimestampMicrosecond(Some(0), None),
+            ScalarValue::TimestampMicrosecond(Some(i64::MAX), None),
+            ScalarValue::TimestampMicrosecond(None, None),
             ScalarValue::List(
                 Some(Box::new(vec![
                     ScalarValue::Float32(Some(-213.1)),
@@ -619,8 +619,8 @@ mod roundtrip_tests {
             ScalarValue::Utf8(None),
             ScalarValue::LargeUtf8(None),
             ScalarValue::Date32(None),
-            ScalarValue::TimestampMicrosecond(None),
-            ScalarValue::TimestampNanosecond(None),
+            ScalarValue::TimestampMicrosecond(None, None),
+            ScalarValue::TimestampNanosecond(None, None),
             //ScalarValue::List(None, DataType::Boolean)
         ];
 
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 68ed709..47b5df4 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -652,12 +652,12 @@ impl TryFrom<&datafusion::scalar::ScalarValue> for protobuf::ScalarValue {
             datafusion::scalar::ScalarValue::Date32(val) => {
                 create_proto_scalar(val, PrimitiveScalarType::Date32, |s| Value::Date32Value(*s))
             }
-            datafusion::scalar::ScalarValue::TimestampMicrosecond(val) => {
+            datafusion::scalar::ScalarValue::TimestampMicrosecond(val, _) => {
                 create_proto_scalar(val, PrimitiveScalarType::TimeMicrosecond, |s| {
                     Value::TimeMicrosecondValue(*s)
                 })
             }
-            datafusion::scalar::ScalarValue::TimestampNanosecond(val) => {
+            datafusion::scalar::ScalarValue::TimestampNanosecond(val, _) => {
                 create_proto_scalar(val, PrimitiveScalarType::TimeNanosecond, |s| {
                     Value::TimeNanosecondValue(*s)
                 })
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index bcdfae7..fc862cd 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -1478,9 +1478,10 @@ macro_rules! make_timestamp_literal {
         #[doc = $DOC]
         impl TimestampLiteral for $TYPE {
             fn lit_timestamp_nano(&self) -> Expr {
-                Expr::Literal(ScalarValue::TimestampNanosecond(Some(
-                    (self.clone()).into(),
-                )))
+                Expr::Literal(ScalarValue::TimestampNanosecond(
+                    Some((self.clone()).into()),
+                    None,
+                ))
             }
         }
     };
@@ -2048,7 +2049,8 @@ mod tests {
     #[test]
     fn test_lit_timestamp_nano() {
         let expr = col("time").eq(lit_timestamp_nano(10)); // 10 is an implicit i32
-        let expected = col("time").eq(lit(ScalarValue::TimestampNanosecond(Some(10))));
+        let expected =
+            col("time").eq(lit(ScalarValue::TimestampNanosecond(Some(10), None)));
         assert_eq!(expr, expected);
 
         let i: i64 = 10;
diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs
index 0ca9212..ff2c05c 100644
--- a/datafusion/src/optimizer/simplify_expressions.rs
+++ b/datafusion/src/optimizer/simplify_expressions.rs
@@ -1703,7 +1703,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let expected = "Projection: TimestampNanosecond(1599566400000000000) AS totimestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\
+        let expected = "Projection: TimestampNanosecond(1599566400000000000, None) AS totimestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\
             \n  TableScan: test projection=None"
             .to_string();
         let actual = get_optimized_plan_formatted(&plan, &Utc::now());
@@ -1780,7 +1780,7 @@ mod tests {
         // expect the same timestamp appears in both exprs
         let actual = get_optimized_plan_formatted(&plan, &time);
         let expected = format!(
-            "Projection: TimestampNanosecond({}) AS now(), TimestampNanosecond({}) AS t2\
+            "Projection: TimestampNanosecond({}, Some(\"UTC\")) AS now(), TimestampNanosecond({}, Some(\"UTC\")) AS t2\
             \n  TableScan: test projection=None",
             time.timestamp_nanos(),
             time.timestamp_nanos()
diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs
index d103127..6af2f66 100644
--- a/datafusion/src/physical_plan/datetime_expressions.rs
+++ b/datafusion/src/physical_plan/datetime_expressions.rs
@@ -181,6 +181,7 @@ pub fn make_now(
     move |_arg| {
         Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
             now_ts,
+            Some("UTC".to_owned()),
         )))
     }
 }
@@ -240,8 +241,11 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
     let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, x)).transpose();
 
     Ok(match array {
-        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v)) => {
-            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond((f)(*v)?))
+        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
+            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
+                (f)(*v)?,
+                tz_opt.clone(),
+            ))
         }
         ColumnarValue::Array(array) => {
             let array = array
diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs
index d8bae7d..bd593fd 100644
--- a/datafusion/src/physical_plan/expressions/binary.rs
+++ b/datafusion/src/physical_plan/expressions/binary.rs
@@ -329,16 +329,16 @@ macro_rules! binary_array_op_scalar {
             DataType::Float32 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float32Array),
             DataType::Float64 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float64Array),
             DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray),
-            DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
                 compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
             }
-            DataType::Timestamp(TimeUnit::Microsecond, None) => {
+            DataType::Timestamp(TimeUnit::Microsecond, _) => {
                 compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
             }
-            DataType::Timestamp(TimeUnit::Millisecond, None) => {
+            DataType::Timestamp(TimeUnit::Millisecond, _) => {
                 compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
             }
-            DataType::Timestamp(TimeUnit::Second, None) => {
+            DataType::Timestamp(TimeUnit::Second, _) => {
                 compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampSecondArray)
             }
             DataType::Date32 => {
@@ -374,16 +374,16 @@ macro_rules! binary_array_op {
             DataType::Float32 => compute_op!($LEFT, $RIGHT, $OP, Float32Array),
             DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array),
             DataType::Utf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, StringArray),
-            DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
                 compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
             }
-            DataType::Timestamp(TimeUnit::Microsecond, None) => {
+            DataType::Timestamp(TimeUnit::Microsecond, _) => {
                 compute_op!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
             }
-            DataType::Timestamp(TimeUnit::Millisecond, None) => {
+            DataType::Timestamp(TimeUnit::Millisecond, _) => {
                 compute_op!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
             }
-            DataType::Timestamp(TimeUnit::Second, None) => {
+            DataType::Timestamp(TimeUnit::Second, _) => {
                 compute_op!($LEFT, $RIGHT, $OP, TimestampSecondArray)
             }
             DataType::Date32 => {
@@ -541,12 +541,14 @@ fn common_binary_type(
 
     // re-write the error message of failed coercions to include the operator's information
     match result {
-        None => Err(DataFusionError::Plan(
+        None => {
+            Err(DataFusionError::Plan(
             format!(
                 "'{:?} {} {:?}' can't be evaluated because there isn't a common type to coerce the types to",
                 lhs_type, op, rhs_type
             ),
-        )),
+        ))
+        },
         Some(t) => Ok(t)
     }
 }
diff --git a/datafusion/src/physical_plan/expressions/coercion.rs b/datafusion/src/physical_plan/expressions/coercion.rs
index 180b165..a449a8d 100644
--- a/datafusion/src/physical_plan/expressions/coercion.rs
+++ b/datafusion/src/physical_plan/expressions/coercion.rs
@@ -100,11 +100,48 @@ pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataTyp
 /// casted to for the purpose of a date computation
 pub fn temporal_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
     use arrow::datatypes::DataType::*;
+    use arrow::datatypes::TimeUnit;
     match (lhs_type, rhs_type) {
         (Utf8, Date32) => Some(Date32),
         (Date32, Utf8) => Some(Date32),
         (Utf8, Date64) => Some(Date64),
         (Date64, Utf8) => Some(Date64),
+        (Timestamp(lhs_unit, lhs_tz), Timestamp(rhs_unit, rhs_tz)) => {
+            let tz = match (lhs_tz, rhs_tz) {
+                // can't cast across timezones
+                (Some(lhs_tz), Some(rhs_tz)) => {
+                    if lhs_tz != rhs_tz {
+                        return None;
+                    } else {
+                        Some(lhs_tz.clone())
+                    }
+                }
+                (Some(lhs_tz), None) => Some(lhs_tz.clone()),
+                (None, Some(rhs_tz)) => Some(rhs_tz.clone()),
+                (None, None) => None,
+            };
+
+            let unit = match (lhs_unit, rhs_unit) {
+                (TimeUnit::Second, TimeUnit::Millisecond) => TimeUnit::Second,
+                (TimeUnit::Second, TimeUnit::Microsecond) => TimeUnit::Second,
+                (TimeUnit::Second, TimeUnit::Nanosecond) => TimeUnit::Second,
+                (TimeUnit::Millisecond, TimeUnit::Second) => TimeUnit::Second,
+                (TimeUnit::Millisecond, TimeUnit::Microsecond) => TimeUnit::Millisecond,
+                (TimeUnit::Millisecond, TimeUnit::Nanosecond) => TimeUnit::Millisecond,
+                (TimeUnit::Microsecond, TimeUnit::Second) => TimeUnit::Second,
+                (TimeUnit::Microsecond, TimeUnit::Millisecond) => TimeUnit::Millisecond,
+                (TimeUnit::Microsecond, TimeUnit::Nanosecond) => TimeUnit::Microsecond,
+                (TimeUnit::Nanosecond, TimeUnit::Second) => TimeUnit::Second,
+                (TimeUnit::Nanosecond, TimeUnit::Millisecond) => TimeUnit::Millisecond,
+                (TimeUnit::Nanosecond, TimeUnit::Microsecond) => TimeUnit::Microsecond,
+                (l, r) => {
+                    assert_eq!(l, r);
+                    l.clone()
+                }
+            };
+
+            Some(Timestamp(unit, tz))
+        }
         _ => None,
     }
 }
diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs
index 2f61881..8f6cd45 100644
--- a/datafusion/src/physical_plan/expressions/min_max.rs
+++ b/datafusion/src/physical_plan/expressions/min_max.rs
@@ -129,6 +129,12 @@ macro_rules! typed_min_max_batch {
         let value = compute::$OP(array);
         ScalarValue::$SCALAR(value)
     }};
+
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident, $TZ:expr) => {{
+        let array = $VALUES.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+        let value = compute::$OP(array);
+        ScalarValue::$SCALAR(value, $TZ.clone())
+    }};
 }
 
 // TODO implement this in arrow-rs with simd
@@ -189,26 +195,35 @@ macro_rules! min_max_batch {
             DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP),
             DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP),
             DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP),
-            DataType::Timestamp(TimeUnit::Second, _) => {
-                typed_min_max_batch!($VALUES, TimestampSecondArray, TimestampSecond, $OP)
+            DataType::Timestamp(TimeUnit::Second, tz_opt) => {
+                typed_min_max_batch!(
+                    $VALUES,
+                    TimestampSecondArray,
+                    TimestampSecond,
+                    $OP,
+                    tz_opt
+                )
             }
-            DataType::Timestamp(TimeUnit::Millisecond, _) => typed_min_max_batch!(
+            DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!(
                 $VALUES,
                 TimestampMillisecondArray,
                 TimestampMillisecond,
-                $OP
+                $OP,
+                tz_opt
             ),
-            DataType::Timestamp(TimeUnit::Microsecond, _) => typed_min_max_batch!(
+            DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!(
                 $VALUES,
                 TimestampMicrosecondArray,
                 TimestampMicrosecond,
-                $OP
+                $OP,
+                tz_opt
             ),
-            DataType::Timestamp(TimeUnit::Nanosecond, _) => typed_min_max_batch!(
+            DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!(
                 $VALUES,
                 TimestampNanosecondArray,
                 TimestampNanosecond,
-                $OP
+                $OP,
+                tz_opt
             ),
             DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP),
             DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP),
@@ -273,6 +288,18 @@ macro_rules! typed_min_max {
             (Some(a), Some(b)) => Some((*a).$OP(*b)),
         })
     }};
+
+    ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident, $TZ:expr) => {{
+        ScalarValue::$SCALAR(
+            match ($VALUE, $DELTA) {
+                (None, None) => None,
+                (Some(a), None) => Some(a.clone()),
+                (None, Some(b)) => Some(b.clone()),
+                (Some(a), Some(b)) => Some((*a).$OP(*b)),
+            },
+            $TZ.clone(),
+        )
+    }};
 }
 
 // min/max of two scalar string values.
@@ -337,26 +364,26 @@ macro_rules! min_max {
             (ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => {
                 typed_min_max_string!(lhs, rhs, LargeUtf8, $OP)
             }
-            (ScalarValue::TimestampSecond(lhs), ScalarValue::TimestampSecond(rhs)) => {
-                typed_min_max!(lhs, rhs, TimestampSecond, $OP)
+            (ScalarValue::TimestampSecond(lhs, l_tz), ScalarValue::TimestampSecond(rhs, _)) => {
+                typed_min_max!(lhs, rhs, TimestampSecond, $OP, l_tz)
             }
             (
-                ScalarValue::TimestampMillisecond(lhs),
-                ScalarValue::TimestampMillisecond(rhs),
+                ScalarValue::TimestampMillisecond(lhs, l_tz),
+                ScalarValue::TimestampMillisecond(rhs, _),
             ) => {
-                typed_min_max!(lhs, rhs, TimestampMillisecond, $OP)
+                typed_min_max!(lhs, rhs, TimestampMillisecond, $OP, l_tz)
             }
             (
-                ScalarValue::TimestampMicrosecond(lhs),
-                ScalarValue::TimestampMicrosecond(rhs),
+                ScalarValue::TimestampMicrosecond(lhs, l_tz),
+                ScalarValue::TimestampMicrosecond(rhs, _),
             ) => {
-                typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP)
+                typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP, l_tz)
             }
             (
-                ScalarValue::TimestampNanosecond(lhs),
-                ScalarValue::TimestampNanosecond(rhs),
+                ScalarValue::TimestampNanosecond(lhs, l_tz),
+                ScalarValue::TimestampNanosecond(rhs, _),
             ) => {
-                typed_min_max!(lhs, rhs, TimestampNanosecond, $OP)
+                typed_min_max!(lhs, rhs, TimestampNanosecond, $OP, l_tz)
             }
             (
                 ScalarValue::Date32(lhs),
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 9c59b96..df073b6 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -612,7 +612,10 @@ pub fn return_type(
         BuiltinScalarFunction::ToTimestampSeconds => {
             Ok(DataType::Timestamp(TimeUnit::Second, None))
         }
-        BuiltinScalarFunction::Now => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
+        BuiltinScalarFunction::Now => Ok(DataType::Timestamp(
+            TimeUnit::Nanosecond,
+            Some("UTC".to_owned()),
+        )),
         BuiltinScalarFunction::Translate => {
             utf8_to_str_type(&input_expr_types[0], "translate")
         }
diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs
index fbd0c97..25d1f3f 100644
--- a/datafusion/src/physical_plan/hash_utils.rs
+++ b/datafusion/src/physical_plan/hash_utils.rs
@@ -369,7 +369,7 @@ pub fn create_hashes<'a>(
                     multi_col
                 );
             }
-            DataType::Timestamp(TimeUnit::Nanosecond, None) => {
+            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
                 hash_array_primitive!(
                     TimestampNanosecondArray,
                     col,
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 1302369..6d913ac 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -632,6 +632,7 @@ impl DefaultPhysicalPlanner {
                     let physical_input = self.create_initial_plan(input, ctx_state).await?;
                     let input_schema = physical_input.as_ref().schema();
                     let input_dfschema = input.as_ref().schema();
+
                     let runtime_expr = self.create_physical_expr(
                         predicate,
                         input_dfschema,
diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs
index 5eb29bb..e8898c1 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -29,7 +29,7 @@ use crate::physical_plan::{
 };
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
-use arrow::datatypes::SchemaRef;
+use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 use arrow::{array::ArrayRef, error::ArrowError};
@@ -201,6 +201,15 @@ fn sort_batch(
         None,
     )?;
 
+    let schema = Arc::new(Schema::new(
+        schema
+            .fields()
+            .iter()
+            .zip(batch.columns().iter().map(|col| col.data_type()))
+            .map(|(field, ty)| Field::new(field.name(), ty.clone(), field.is_nullable()))
+            .collect::<Vec<_>>(),
+    ));
+
     // reorder all rows based on sorted indices
     RecordBatch::try_new(
         schema,
diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs
index 35ebb2a..cdcf11e 100644
--- a/datafusion/src/scalar.rs
+++ b/datafusion/src/scalar.rs
@@ -20,6 +20,7 @@
 use crate::error::{DataFusionError, Result};
 use arrow::{
     array::*,
+    compute::kernels::cast::cast,
     datatypes::{
         ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, Float32Type,
         Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalUnit, TimeUnit,
@@ -82,13 +83,13 @@ pub enum ScalarValue {
     /// Date stored as a signed 64bit int
     Date64(Option<i64>),
     /// Timestamp Second
-    TimestampSecond(Option<i64>),
+    TimestampSecond(Option<i64>, Option<String>),
     /// Timestamp Milliseconds
-    TimestampMillisecond(Option<i64>),
+    TimestampMillisecond(Option<i64>, Option<String>),
     /// Timestamp Microseconds
-    TimestampMicrosecond(Option<i64>),
+    TimestampMicrosecond(Option<i64>, Option<String>),
     /// Timestamp Nanoseconds
-    TimestampNanosecond(Option<i64>),
+    TimestampNanosecond(Option<i64>, Option<String>),
     /// Interval with YearMonth unit
     IntervalYearMonth(Option<i32>),
     /// Interval with DayTime unit
@@ -155,14 +156,14 @@ impl PartialEq for ScalarValue {
             (Date32(_), _) => false,
             (Date64(v1), Date64(v2)) => v1.eq(v2),
             (Date64(_), _) => false,
-            (TimestampSecond(v1), TimestampSecond(v2)) => v1.eq(v2),
-            (TimestampSecond(_), _) => false,
-            (TimestampMillisecond(v1), TimestampMillisecond(v2)) => v1.eq(v2),
-            (TimestampMillisecond(_), _) => false,
-            (TimestampMicrosecond(v1), TimestampMicrosecond(v2)) => v1.eq(v2),
-            (TimestampMicrosecond(_), _) => false,
-            (TimestampNanosecond(v1), TimestampNanosecond(v2)) => v1.eq(v2),
-            (TimestampNanosecond(_), _) => false,
+            (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.eq(v2),
+            (TimestampSecond(_, _), _) => false,
+            (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => v1.eq(v2),
+            (TimestampMillisecond(_, _), _) => false,
+            (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => v1.eq(v2),
+            (TimestampMicrosecond(_, _), _) => false,
+            (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => v1.eq(v2),
+            (TimestampNanosecond(_, _), _) => false,
             (IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.eq(v2),
             (IntervalYearMonth(_), _) => false,
             (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.eq(v2),
@@ -241,14 +242,20 @@ impl PartialOrd for ScalarValue {
             (Date32(_), _) => None,
             (Date64(v1), Date64(v2)) => v1.partial_cmp(v2),
             (Date64(_), _) => None,
-            (TimestampSecond(v1), TimestampSecond(v2)) => v1.partial_cmp(v2),
-            (TimestampSecond(_), _) => None,
-            (TimestampMillisecond(v1), TimestampMillisecond(v2)) => v1.partial_cmp(v2),
-            (TimestampMillisecond(_), _) => None,
-            (TimestampMicrosecond(v1), TimestampMicrosecond(v2)) => v1.partial_cmp(v2),
-            (TimestampMicrosecond(_), _) => None,
-            (TimestampNanosecond(v1), TimestampNanosecond(v2)) => v1.partial_cmp(v2),
-            (TimestampNanosecond(_), _) => None,
+            (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.partial_cmp(v2),
+            (TimestampSecond(_, _), _) => None,
+            (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => {
+                v1.partial_cmp(v2)
+            }
+            (TimestampMillisecond(_, _), _) => None,
+            (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => {
+                v1.partial_cmp(v2)
+            }
+            (TimestampMicrosecond(_, _), _) => None,
+            (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => {
+                v1.partial_cmp(v2)
+            }
+            (TimestampNanosecond(_, _), _) => None,
             (IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.partial_cmp(v2),
             (IntervalYearMonth(_), _) => None,
             (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.partial_cmp(v2),
@@ -305,10 +312,10 @@ impl std::hash::Hash for ScalarValue {
             }
             Date32(v) => v.hash(state),
             Date64(v) => v.hash(state),
-            TimestampSecond(v) => v.hash(state),
-            TimestampMillisecond(v) => v.hash(state),
-            TimestampMicrosecond(v) => v.hash(state),
-            TimestampNanosecond(v) => v.hash(state),
+            TimestampSecond(v, _) => v.hash(state),
+            TimestampMillisecond(v, _) => v.hash(state),
+            TimestampMicrosecond(v, _) => v.hash(state),
+            TimestampNanosecond(v, _) => v.hash(state),
             IntervalYearMonth(v) => v.hash(state),
             IntervalDayTime(v) => v.hash(state),
             Struct(v, t) => {
@@ -344,6 +351,19 @@ fn get_dict_value<K: ArrowDictionaryKeyType>(
     Ok((dict_array.values(), Some(values_index)))
 }
 
+macro_rules! typed_cast_tz {
+    ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident, $TZ:expr) => {{
+        let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+        ScalarValue::$SCALAR(
+            match array.is_null($index) {
+                true => None,
+                false => Some(array.value($index).into()),
+            },
+            $TZ.clone(),
+        )
+    }};
+}
+
 macro_rules! typed_cast {
     ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident) => {{
         let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
@@ -392,25 +412,25 @@ macro_rules! build_timestamp_list {
             Some(values) => {
                 let values = values.as_ref();
                 match $TIME_UNIT {
-                    TimeUnit::Second => build_values_list!(
+                    TimeUnit::Second => build_values_list_tz!(
                         TimestampSecondBuilder,
                         TimestampSecond,
                         values,
                         $SIZE
                     ),
-                    TimeUnit::Microsecond => build_values_list!(
+                    TimeUnit::Microsecond => build_values_list_tz!(
                         TimestampMillisecondBuilder,
                         TimestampMillisecond,
                         values,
                         $SIZE
                     ),
-                    TimeUnit::Millisecond => build_values_list!(
+                    TimeUnit::Millisecond => build_values_list_tz!(
                         TimestampMicrosecondBuilder,
                         TimestampMicrosecond,
                         values,
                         $SIZE
                     ),
-                    TimeUnit::Nanosecond => build_values_list!(
+                    TimeUnit::Nanosecond => build_values_list_tz!(
                         TimestampNanosecondBuilder,
                         TimestampNanosecond,
                         values,
@@ -445,6 +465,29 @@ macro_rules! build_values_list {
     }};
 }
 
+macro_rules! build_values_list_tz {
+    ($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{
+        let mut builder = ListBuilder::new($VALUE_BUILDER_TY::new($VALUES.len()));
+
+        for _ in 0..$SIZE {
+            for scalar_value in $VALUES {
+                match scalar_value {
+                    ScalarValue::$SCALAR_TY(Some(v), _) => {
+                        builder.values().append_value(v.clone()).unwrap()
+                    }
+                    ScalarValue::$SCALAR_TY(None, _) => {
+                        builder.values().append_null().unwrap();
+                    }
+                    _ => panic!("Incompatible ScalarValue for list"),
+                };
+            }
+            builder.append(true).unwrap();
+        }
+
+        builder.finish()
+    }};
+}
+
 macro_rules! build_array_from_option {
     ($DATA_TYPE:ident, $ARRAY_TYPE:ident, $EXPR:expr, $SIZE:expr) => {{
         match $EXPR {
@@ -460,7 +503,12 @@ macro_rules! build_array_from_option {
     }};
     ($DATA_TYPE:ident, $ENUM:expr, $ENUM2:expr, $ARRAY_TYPE:ident, $EXPR:expr, $SIZE:expr) => {{
         match $EXPR {
-            Some(value) => Arc::new($ARRAY_TYPE::from_value(*value, $SIZE)),
+            Some(value) => {
+                let array: ArrayRef = Arc::new($ARRAY_TYPE::from_value(*value, $SIZE));
+                // Need to call cast to cast to final data type with timezone/extra param
+                cast(&array, &DataType::$DATA_TYPE($ENUM, $ENUM2))
+                    .expect("cannot do temporal cast")
+            }
             None => new_null_array(&DataType::$DATA_TYPE($ENUM, $ENUM2), $SIZE),
         }
     }};
@@ -508,17 +556,17 @@ impl ScalarValue {
             ScalarValue::Decimal128(_, precision, scale) => {
                 DataType::Decimal(*precision, *scale)
             }
-            ScalarValue::TimestampSecond(_) => {
-                DataType::Timestamp(TimeUnit::Second, None)
+            ScalarValue::TimestampSecond(_, tz_opt) => {
+                DataType::Timestamp(TimeUnit::Second, tz_opt.clone())
             }
-            ScalarValue::TimestampMillisecond(_) => {
-                DataType::Timestamp(TimeUnit::Millisecond, None)
+            ScalarValue::TimestampMillisecond(_, tz_opt) => {
+                DataType::Timestamp(TimeUnit::Millisecond, tz_opt.clone())
             }
-            ScalarValue::TimestampMicrosecond(_) => {
-                DataType::Timestamp(TimeUnit::Microsecond, None)
+            ScalarValue::TimestampMicrosecond(_, tz_opt) => {
+                DataType::Timestamp(TimeUnit::Microsecond, tz_opt.clone())
             }
-            ScalarValue::TimestampNanosecond(_) => {
-                DataType::Timestamp(TimeUnit::Nanosecond, None)
+            ScalarValue::TimestampNanosecond(_, tz_opt) => {
+                DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
             }
             ScalarValue::Float32(_) => DataType::Float32,
             ScalarValue::Float64(_) => DataType::Float64,
@@ -583,9 +631,10 @@ impl ScalarValue {
                 | ScalarValue::Utf8(None)
                 | ScalarValue::LargeUtf8(None)
                 | ScalarValue::List(None, _)
-                | ScalarValue::TimestampMillisecond(None)
-                | ScalarValue::TimestampMicrosecond(None)
-                | ScalarValue::TimestampNanosecond(None)
+                | ScalarValue::TimestampSecond(None, _)
+                | ScalarValue::TimestampMillisecond(None, _)
+                | ScalarValue::TimestampMicrosecond(None, _)
+                | ScalarValue::TimestampNanosecond(None, _)
                 | ScalarValue::Struct(None, _)
                 | ScalarValue::Decimal128(None, _, _) // For decimal type, the value is null means ScalarValue::Decimal128 is null.
         )
@@ -666,6 +715,28 @@ impl ScalarValue {
             }};
         }
 
+        macro_rules! build_array_primitive_tz {
+            ($ARRAY_TY:ident, $SCALAR_TY:ident) => {{
+                {
+                    let array = scalars
+                        .map(|sv| {
+                            if let ScalarValue::$SCALAR_TY(v, _) = sv {
+                                Ok(v)
+                            } else {
+                                Err(DataFusionError::Internal(format!(
+                                    "Inconsistent types in ScalarValue::iter_to_array. \
+                                     Expected {:?}, got {:?}",
+                                    data_type, sv
+                                )))
+                            }
+                        })
+                        .collect::<Result<$ARRAY_TY>>()?;
+
+                    Arc::new(array)
+                }
+            }};
+        }
+
         /// Creates an array of $ARRAY_TY by unpacking values of
         /// SCALAR_TY for "string-like" types.
         macro_rules! build_array_string {
@@ -775,17 +846,17 @@ impl ScalarValue {
             DataType::LargeBinary => build_array_string!(LargeBinaryArray, LargeBinary),
             DataType::Date32 => build_array_primitive!(Date32Array, Date32),
             DataType::Date64 => build_array_primitive!(Date64Array, Date64),
-            DataType::Timestamp(TimeUnit::Second, None) => {
-                build_array_primitive!(TimestampSecondArray, TimestampSecond)
+            DataType::Timestamp(TimeUnit::Second, _) => {
+                build_array_primitive_tz!(TimestampSecondArray, TimestampSecond)
             }
-            DataType::Timestamp(TimeUnit::Millisecond, None) => {
-                build_array_primitive!(TimestampMillisecondArray, TimestampMillisecond)
+            DataType::Timestamp(TimeUnit::Millisecond, _) => {
+                build_array_primitive_tz!(TimestampMillisecondArray, TimestampMillisecond)
             }
-            DataType::Timestamp(TimeUnit::Microsecond, None) => {
-                build_array_primitive!(TimestampMicrosecondArray, TimestampMicrosecond)
+            DataType::Timestamp(TimeUnit::Microsecond, _) => {
+                build_array_primitive_tz!(TimestampMicrosecondArray, TimestampMicrosecond)
             }
-            DataType::Timestamp(TimeUnit::Nanosecond, None) => {
-                build_array_primitive!(TimestampNanosecondArray, TimestampNanosecond)
+            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+                build_array_primitive_tz!(TimestampNanosecondArray, TimestampNanosecond)
             }
             DataType::Interval(IntervalUnit::DayTime) => {
                 build_array_primitive!(IntervalDayTimeArray, IntervalDayTime)
@@ -1036,35 +1107,35 @@ impl ScalarValue {
             ScalarValue::UInt64(e) => {
                 build_array_from_option!(UInt64, UInt64Array, e, size)
             }
-            ScalarValue::TimestampSecond(e) => build_array_from_option!(
+            ScalarValue::TimestampSecond(e, tz_opt) => build_array_from_option!(
                 Timestamp,
                 TimeUnit::Second,
-                None,
+                tz_opt.clone(),
                 TimestampSecondArray,
                 e,
                 size
             ),
-            ScalarValue::TimestampMillisecond(e) => build_array_from_option!(
+            ScalarValue::TimestampMillisecond(e, tz_opt) => build_array_from_option!(
                 Timestamp,
                 TimeUnit::Millisecond,
-                None,
+                tz_opt.clone(),
                 TimestampMillisecondArray,
                 e,
                 size
             ),
 
-            ScalarValue::TimestampMicrosecond(e) => build_array_from_option!(
+            ScalarValue::TimestampMicrosecond(e, tz_opt) => build_array_from_option!(
                 Timestamp,
                 TimeUnit::Microsecond,
-                None,
+                tz_opt.clone(),
                 TimestampMicrosecondArray,
                 e,
                 size
             ),
-            ScalarValue::TimestampNanosecond(e) => build_array_from_option!(
+            ScalarValue::TimestampNanosecond(e, tz_opt) => build_array_from_option!(
                 Timestamp,
                 TimeUnit::Nanosecond,
-                None,
+                tz_opt.clone(),
                 TimestampNanosecondArray,
                 e,
                 size
@@ -1251,27 +1322,41 @@ impl ScalarValue {
             DataType::Date64 => {
                 typed_cast!(array, index, Date64Array, Date64)
             }
-            DataType::Timestamp(TimeUnit::Second, _) => {
-                typed_cast!(array, index, TimestampSecondArray, TimestampSecond)
+            DataType::Timestamp(TimeUnit::Second, tz_opt) => {
+                typed_cast_tz!(
+                    array,
+                    index,
+                    TimestampSecondArray,
+                    TimestampSecond,
+                    tz_opt
+                )
             }
-            DataType::Timestamp(TimeUnit::Millisecond, _) => {
-                typed_cast!(
+            DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
+                typed_cast_tz!(
                     array,
                     index,
                     TimestampMillisecondArray,
-                    TimestampMillisecond
+                    TimestampMillisecond,
+                    tz_opt
                 )
             }
-            DataType::Timestamp(TimeUnit::Microsecond, _) => {
-                typed_cast!(
+            DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
+                typed_cast_tz!(
                     array,
                     index,
                     TimestampMicrosecondArray,
-                    TimestampMicrosecond
+                    TimestampMicrosecond,
+                    tz_opt
                 )
             }
-            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
-                typed_cast!(array, index, TimestampNanosecondArray, TimestampNanosecond)
+            DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
+                typed_cast_tz!(
+                    array,
+                    index,
+                    TimestampNanosecondArray,
+                    TimestampNanosecond,
+                    tz_opt
+                )
             }
             DataType::Dictionary(index_type, _) => {
                 let (values, values_index) = match **index_type {
@@ -1407,16 +1492,16 @@ impl ScalarValue {
             ScalarValue::Date64(val) => {
                 eq_array_primitive!(array, index, Date64Array, val)
             }
-            ScalarValue::TimestampSecond(val) => {
+            ScalarValue::TimestampSecond(val, _) => {
                 eq_array_primitive!(array, index, TimestampSecondArray, val)
             }
-            ScalarValue::TimestampMillisecond(val) => {
+            ScalarValue::TimestampMillisecond(val, _) => {
                 eq_array_primitive!(array, index, TimestampMillisecondArray, val)
             }
-            ScalarValue::TimestampMicrosecond(val) => {
+            ScalarValue::TimestampMicrosecond(val, _) => {
                 eq_array_primitive!(array, index, TimestampMicrosecondArray, val)
             }
-            ScalarValue::TimestampNanosecond(val) => {
+            ScalarValue::TimestampNanosecond(val, _) => {
                 eq_array_primitive!(array, index, TimestampNanosecondArray, val)
             }
             ScalarValue::IntervalYearMonth(val) => {
@@ -1565,10 +1650,10 @@ impl TryFrom<ScalarValue> for i64 {
         match value {
             ScalarValue::Int64(Some(inner_value))
             | ScalarValue::Date64(Some(inner_value))
-            | ScalarValue::TimestampNanosecond(Some(inner_value))
-            | ScalarValue::TimestampMicrosecond(Some(inner_value))
-            | ScalarValue::TimestampMillisecond(Some(inner_value))
-            | ScalarValue::TimestampSecond(Some(inner_value)) => Ok(inner_value),
+            | ScalarValue::TimestampNanosecond(Some(inner_value), _)
+            | ScalarValue::TimestampMicrosecond(Some(inner_value), _)
+            | ScalarValue::TimestampMillisecond(Some(inner_value), _)
+            | ScalarValue::TimestampSecond(Some(inner_value), _) => Ok(inner_value),
             _ => Err(DataFusionError::Internal(format!(
                 "Cannot convert {:?} to {}",
                 value,
@@ -1610,17 +1695,17 @@ impl TryFrom<&DataType> for ScalarValue {
             DataType::LargeUtf8 => ScalarValue::LargeUtf8(None),
             DataType::Date32 => ScalarValue::Date32(None),
             DataType::Date64 => ScalarValue::Date64(None),
-            DataType::Timestamp(TimeUnit::Second, _) => {
-                ScalarValue::TimestampSecond(None)
+            DataType::Timestamp(TimeUnit::Second, tz_opt) => {
+                ScalarValue::TimestampSecond(None, tz_opt.clone())
             }
-            DataType::Timestamp(TimeUnit::Millisecond, _) => {
-                ScalarValue::TimestampMillisecond(None)
+            DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
+                ScalarValue::TimestampMillisecond(None, tz_opt.clone())
             }
-            DataType::Timestamp(TimeUnit::Microsecond, _) => {
-                ScalarValue::TimestampMicrosecond(None)
+            DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
+                ScalarValue::TimestampMicrosecond(None, tz_opt.clone())
             }
-            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
-                ScalarValue::TimestampNanosecond(None)
+            DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
+                ScalarValue::TimestampNanosecond(None, tz_opt.clone())
             }
             DataType::Dictionary(_index_type, value_type) => {
                 value_type.as_ref().try_into()?
@@ -1667,10 +1752,10 @@ impl fmt::Display for ScalarValue {
             ScalarValue::UInt16(e) => format_option!(f, e)?,
             ScalarValue::UInt32(e) => format_option!(f, e)?,
             ScalarValue::UInt64(e) => format_option!(f, e)?,
-            ScalarValue::TimestampSecond(e) => format_option!(f, e)?,
-            ScalarValue::TimestampMillisecond(e) => format_option!(f, e)?,
-            ScalarValue::TimestampMicrosecond(e) => format_option!(f, e)?,
-            ScalarValue::TimestampNanosecond(e) => format_option!(f, e)?,
+            ScalarValue::TimestampSecond(e, _) => format_option!(f, e)?,
+            ScalarValue::TimestampMillisecond(e, _) => format_option!(f, e)?,
+            ScalarValue::TimestampMicrosecond(e, _) => format_option!(f, e)?,
+            ScalarValue::TimestampNanosecond(e, _) => format_option!(f, e)?,
             ScalarValue::Utf8(e) => format_option!(f, e)?,
             ScalarValue::LargeUtf8(e) => format_option!(f, e)?,
             ScalarValue::Binary(e) => match e {
@@ -1742,15 +1827,17 @@ impl fmt::Debug for ScalarValue {
             ScalarValue::UInt16(_) => write!(f, "UInt16({})", self),
             ScalarValue::UInt32(_) => write!(f, "UInt32({})", self),
             ScalarValue::UInt64(_) => write!(f, "UInt64({})", self),
-            ScalarValue::TimestampSecond(_) => write!(f, "TimestampSecond({})", self),
-            ScalarValue::TimestampMillisecond(_) => {
-                write!(f, "TimestampMillisecond({})", self)
+            ScalarValue::TimestampSecond(_, tz_opt) => {
+                write!(f, "TimestampSecond({}, {:?})", self, tz_opt)
             }
-            ScalarValue::TimestampMicrosecond(_) => {
-                write!(f, "TimestampMicrosecond({})", self)
+            ScalarValue::TimestampMillisecond(_, tz_opt) => {
+                write!(f, "TimestampMillisecond({}, {:?})", self, tz_opt)
             }
-            ScalarValue::TimestampNanosecond(_) => {
-                write!(f, "TimestampNanosecond({})", self)
+            ScalarValue::TimestampMicrosecond(_, tz_opt) => {
+                write!(f, "TimestampMicrosecond({}, {:?})", self, tz_opt)
+            }
+            ScalarValue::TimestampNanosecond(_, tz_opt) => {
+                write!(f, "TimestampNanosecond({}, {:?})", self, tz_opt)
             }
             ScalarValue::Utf8(None) => write!(f, "Utf8({})", self),
             ScalarValue::Utf8(Some(_)) => write!(f, "Utf8(\"{}\")", self),
@@ -1802,25 +1889,25 @@ impl ScalarType<f32> for Float32Type {
 
 impl ScalarType<i64> for TimestampSecondType {
     fn scalar(r: Option<i64>) -> ScalarValue {
-        ScalarValue::TimestampSecond(r)
+        ScalarValue::TimestampSecond(r, None)
     }
 }
 
 impl ScalarType<i64> for TimestampMillisecondType {
     fn scalar(r: Option<i64>) -> ScalarValue {
-        ScalarValue::TimestampMillisecond(r)
+        ScalarValue::TimestampMillisecond(r, None)
     }
 }
 
 impl ScalarType<i64> for TimestampMicrosecondType {
     fn scalar(r: Option<i64>) -> ScalarValue {
-        ScalarValue::TimestampMicrosecond(r)
+        ScalarValue::TimestampMicrosecond(r, None)
     }
 }
 
 impl ScalarType<i64> for TimestampNanosecondType {
     fn scalar(r: Option<i64>) -> ScalarValue {
-        ScalarValue::TimestampNanosecond(r)
+        ScalarValue::TimestampNanosecond(r, None)
     }
 }
 
@@ -2007,6 +2094,23 @@ mod tests {
         }};
     }
 
+    /// Creates array directly and via ScalarValue and ensures they are the same
+    /// but for variants that carry a timezone field.
+    macro_rules! check_scalar_iter_tz {
+        ($SCALAR_T:ident, $ARRAYTYPE:ident, $INPUT:expr) => {{
+            let scalars: Vec<_> = $INPUT
+                .iter()
+                .map(|v| ScalarValue::$SCALAR_T(*v, None))
+                .collect();
+
+            let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap();
+
+            let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT));
+
+            assert_eq!(&array, &expected);
+        }};
+    }
+
     /// Creates array directly and via ScalarValue and ensures they
     /// are the same, for string  arrays
     macro_rules! check_scalar_iter_string {
@@ -2060,22 +2164,22 @@ mod tests {
         check_scalar_iter!(UInt32, UInt32Array, vec![Some(1), None, Some(3)]);
         check_scalar_iter!(UInt64, UInt64Array, vec![Some(1), None, Some(3)]);
 
-        check_scalar_iter!(
+        check_scalar_iter_tz!(
             TimestampSecond,
             TimestampSecondArray,
             vec![Some(1), None, Some(3)]
         );
-        check_scalar_iter!(
+        check_scalar_iter_tz!(
             TimestampMillisecond,
             TimestampMillisecondArray,
             vec![Some(1), None, Some(3)]
         );
-        check_scalar_iter!(
+        check_scalar_iter_tz!(
             TimestampMicrosecond,
             TimestampMicrosecondArray,
             vec![Some(1), None, Some(3)]
         );
-        check_scalar_iter!(
+        check_scalar_iter_tz!(
             TimestampNanosecond,
             TimestampNanosecondArray,
             vec![Some(1), None, Some(3)]
@@ -2156,6 +2260,10 @@ mod tests {
         // Since ScalarValues are used in a non trivial number of places,
         // making it larger means significant more memory consumption
         // per distinct value.
+        #[cfg(target_arch = "aarch64")]
+        assert_eq!(std::mem::size_of::<ScalarValue>(), 64);
+
+        #[cfg(target_arch = "amd64")]
         assert_eq!(std::mem::size_of::<ScalarValue>(), 48);
     }
 
@@ -2203,6 +2311,17 @@ mod tests {
                     scalars: $INPUT.iter().map(|v| ScalarValue::$SCALAR_TY(*v)).collect(),
                 }
             }};
+
+            ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident, $TZ:expr) => {{
+                let tz = $TZ;
+                TestCase {
+                    array: Arc::new($INPUT.iter().collect::<$ARRAY_TY>()),
+                    scalars: $INPUT
+                        .iter()
+                        .map(|v| ScalarValue::$SCALAR_TY(*v, tz.clone()))
+                        .collect(),
+                }
+            }};
         }
 
         macro_rules! make_str_test_case {
@@ -2267,10 +2386,49 @@ mod tests {
             make_binary_test_case!(str_vals, LargeBinaryArray, LargeBinary),
             make_test_case!(i32_vals, Date32Array, Date32),
             make_test_case!(i64_vals, Date64Array, Date64),
-            make_test_case!(i64_vals, TimestampSecondArray, TimestampSecond),
-            make_test_case!(i64_vals, TimestampMillisecondArray, TimestampMillisecond),
-            make_test_case!(i64_vals, TimestampMicrosecondArray, TimestampMicrosecond),
-            make_test_case!(i64_vals, TimestampNanosecondArray, TimestampNanosecond),
+            make_test_case!(i64_vals, TimestampSecondArray, TimestampSecond, None),
+            make_test_case!(
+                i64_vals,
+                TimestampSecondArray,
+                TimestampSecond,
+                Some("UTC".to_owned())
+            ),
+            make_test_case!(
+                i64_vals,
+                TimestampMillisecondArray,
+                TimestampMillisecond,
+                None
+            ),
+            make_test_case!(
+                i64_vals,
+                TimestampMillisecondArray,
+                TimestampMillisecond,
+                Some("UTC".to_owned())
+            ),
+            make_test_case!(
+                i64_vals,
+                TimestampMicrosecondArray,
+                TimestampMicrosecond,
+                None
+            ),
+            make_test_case!(
+                i64_vals,
+                TimestampMicrosecondArray,
+                TimestampMicrosecond,
+                Some("UTC".to_owned())
+            ),
+            make_test_case!(
+                i64_vals,
+                TimestampNanosecondArray,
+                TimestampNanosecond,
+                None
+            ),
+            make_test_case!(
+                i64_vals,
+                TimestampNanosecondArray,
+                TimestampNanosecond,
+                Some("UTC".to_owned())
+            ),
             make_test_case!(i32_vals, IntervalYearMonthArray, IntervalYearMonth),
             make_test_case!(i64_vals, IntervalDayTimeArray, IntervalDayTime),
             make_str_dict_test_case!(str_vals, Int8Type, Utf8),
@@ -2897,4 +3055,30 @@ mod tests {
 
         assert_eq!(array, &expected);
     }
+
+    #[test]
+    fn scalar_timestamp_ns_utc_timezone() {
+        let scalar = ScalarValue::TimestampNanosecond(
+            Some(1599566400000000000),
+            Some("UTC".to_owned()),
+        );
+
+        assert_eq!(
+            scalar.get_datatype(),
+            DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
+        );
+
+        let array = scalar.to_array();
+        assert_eq!(array.len(), 1);
+        assert_eq!(
+            array.data_type(),
+            &DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
+        );
+
+        let newscalar = ScalarValue::try_from_array(&array, 0).unwrap();
+        assert_eq!(
+            newscalar.get_datatype(),
+            DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
+        );
+    }
 }
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index b72606f..7c3210d 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -4107,34 +4107,41 @@ fn make_timestamp_table<A>() -> Result<Arc<MemTable>>
 where
     A: ArrowTimestampType,
 {
+    make_timestamp_tz_table::<A>(None)
+}
+
+fn make_timestamp_tz_table<A>(tz: Option<String>) -> Result<Arc<MemTable>>
+where
+    A: ArrowTimestampType,
+{
     let schema = Arc::new(Schema::new(vec![
-        Field::new("ts", DataType::Timestamp(A::get_time_unit(), None), false),
+        Field::new(
+            "ts",
+            DataType::Timestamp(A::get_time_unit(), tz.clone()),
+            false,
+        ),
         Field::new("value", DataType::Int32, true),
     ]));
 
-    let mut builder = PrimitiveBuilder::<A>::new(3);
-
-    let nanotimestamps = vec![
-        1599572549190855000i64, // 2020-09-08T13:42:29.190855+00:00
-        1599568949190855000,    // 2020-09-08T12:42:29.190855+00:00
-        1599565349190855000,    //2020-09-08T11:42:29.190855+00:00
-    ]; // 2020-09-08T11:42:29.190855+00:00
     let divisor = match A::get_time_unit() {
         TimeUnit::Nanosecond => 1,
         TimeUnit::Microsecond => 1000,
         TimeUnit::Millisecond => 1_000_000,
         TimeUnit::Second => 1_000_000_000,
     };
-    for ts in nanotimestamps {
-        builder.append_value(
-            <A as ArrowPrimitiveType>::Native::from_i64(ts / divisor).unwrap(),
-        )?;
-    }
+
+    let timestamps = vec![
+        1599572549190855000i64 / divisor, // 2020-09-08T13:42:29.190855+00:00
+        1599568949190855000 / divisor,    // 2020-09-08T12:42:29.190855+00:00
+        1599565349190855000 / divisor,    //2020-09-08T11:42:29.190855+00:00
+    ]; // 2020-09-08T11:42:29.190855+00:00
+
+    let array = PrimitiveArray::<A>::from_vec(timestamps, tz);
 
     let data = RecordBatch::try_new(
         schema.clone(),
         vec![
-            Arc::new(builder.finish()),
+            Arc::new(array),
             Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
         ],
     )?;
@@ -6615,3 +6622,357 @@ async fn csv_query_with_decimal_by_sql() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn timestamp_minmax() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    let table_a = make_timestamp_tz_table::<TimestampMillisecondType>(None)?;
+    let table_b =
+        make_timestamp_tz_table::<TimestampNanosecondType>(Some("UTC".to_owned()))?;
+    ctx.register_table("table_a", table_a)?;
+    ctx.register_table("table_b", table_b)?;
+
+    let sql = "SELECT MIN(table_a.ts), MAX(table_b.ts) FROM table_a, table_b";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+-------------------------+----------------------------+",
+        "| MIN(table_a.ts)         | MAX(table_b.ts)            |",
+        "+-------------------------+----------------------------+",
+        "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29.190855 |",
+        "+-------------------------+----------------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn timestamp_coercion() -> Result<()> {
+    {
+        let mut ctx = ExecutionContext::new();
+        let table_a =
+            make_timestamp_tz_table::<TimestampSecondType>(Some("UTC".to_owned()))?;
+        let table_b =
+            make_timestamp_tz_table::<TimestampMillisecondType>(Some("UTC".to_owned()))?;
+        ctx.register_table("table_a", table_a)?;
+        ctx.register_table("table_b", table_b)?;
+
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------------------+-------------------------+--------------------------+",
+            "| ts                  | ts                      | table_a.ts Eq table_b.ts |",
+            "+---------------------+-------------------------+--------------------------+",
+            "| 2020-09-08 13:42:29 | 2020-09-08 13:42:29.190 | true                     |",
+            "| 2020-09-08 13:42:29 | 2020-09-08 12:42:29.190 | false                    |",
+            "| 2020-09-08 13:42:29 | 2020-09-08 11:42:29.190 | false                    |",
+            "| 2020-09-08 12:42:29 | 2020-09-08 13:42:29.190 | false                    |",
+            "| 2020-09-08 12:42:29 | 2020-09-08 12:42:29.190 | true                     |",
+            "| 2020-09-08 12:42:29 | 2020-09-08 11:42:29.190 | false                    |",
+            "| 2020-09-08 11:42:29 | 2020-09-08 13:42:29.190 | false                    |",
+            "| 2020-09-08 11:42:29 | 2020-09-08 12:42:29.190 | false                    |",
+            "| 2020-09-08 11:42:29 | 2020-09-08 11:42:29.190 | true                     |",
+            "+---------------------+-------------------------+--------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    {
+        let mut ctx = ExecutionContext::new();
+        let table_a = make_timestamp_table::<TimestampSecondType>()?;
+        let table_b = make_timestamp_table::<TimestampMicrosecondType>()?;
+        ctx.register_table("table_a", table_a)?;
+        ctx.register_table("table_b", table_b)?;
+
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------------------+----------------------------+--------------------------+",
+            "| ts                  | ts                         | table_a.ts Eq table_b.ts |",
+            "+---------------------+----------------------------+--------------------------+",
+            "| 2020-09-08 13:42:29 | 2020-09-08 13:42:29.190855 | true                     |",
+            "| 2020-09-08 13:42:29 | 2020-09-08 12:42:29.190855 | false                    |",
+            "| 2020-09-08 13:42:29 | 2020-09-08 11:42:29.190855 | false                    |",
+            "| 2020-09-08 12:42:29 | 2020-09-08 13:42:29.190855 | false                    |",
+            "| 2020-09-08 12:42:29 | 2020-09-08 12:42:29.190855 | true                     |",
+            "| 2020-09-08 12:42:29 | 2020-09-08 11:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29 | 2020-09-08 13:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29 | 2020-09-08 12:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29 | 2020-09-08 11:42:29.190855 | true                     |",
+            "+---------------------+----------------------------+--------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    {
+        let mut ctx = ExecutionContext::new();
+        let table_a = make_timestamp_table::<TimestampSecondType>()?;
+        let table_b = make_timestamp_table::<TimestampNanosecondType>()?;
+        ctx.register_table("table_a", table_a)?;
+        ctx.register_table("table_b", table_b)?;
+
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------------------+----------------------------+--------------------------+",
+            "| ts                  | ts                         | table_a.ts Eq table_b.ts |",
+            "+---------------------+----------------------------+--------------------------+",
+            "| 2020-09-08 13:42:29 | 2020-09-08 13:42:29.190855 | true                     |",
+            "| 2020-09-08 13:42:29 | 2020-09-08 12:42:29.190855 | false                    |",
+            "| 2020-09-08 13:42:29 | 2020-09-08 11:42:29.190855 | false                    |",
+            "| 2020-09-08 12:42:29 | 2020-09-08 13:42:29.190855 | false                    |",
+            "| 2020-09-08 12:42:29 | 2020-09-08 12:42:29.190855 | true                     |",
+            "| 2020-09-08 12:42:29 | 2020-09-08 11:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29 | 2020-09-08 13:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29 | 2020-09-08 12:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29 | 2020-09-08 11:42:29.190855 | true                     |",
+            "+---------------------+----------------------------+--------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    {
+        let mut ctx = ExecutionContext::new();
+        let table_a = make_timestamp_table::<TimestampMillisecondType>()?;
+        let table_b = make_timestamp_table::<TimestampSecondType>()?;
+        ctx.register_table("table_a", table_a)?;
+        ctx.register_table("table_b", table_b)?;
+
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+-------------------------+---------------------+--------------------------+",
+            "| ts                      | ts                  | table_a.ts Eq table_b.ts |",
+            "+-------------------------+---------------------+--------------------------+",
+            "| 2020-09-08 13:42:29.190 | 2020-09-08 13:42:29 | true                     |",
+            "| 2020-09-08 13:42:29.190 | 2020-09-08 12:42:29 | false                    |",
+            "| 2020-09-08 13:42:29.190 | 2020-09-08 11:42:29 | false                    |",
+            "| 2020-09-08 12:42:29.190 | 2020-09-08 13:42:29 | false                    |",
+            "| 2020-09-08 12:42:29.190 | 2020-09-08 12:42:29 | true                     |",
+            "| 2020-09-08 12:42:29.190 | 2020-09-08 11:42:29 | false                    |",
+            "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29 | false                    |",
+            "| 2020-09-08 11:42:29.190 | 2020-09-08 12:42:29 | false                    |",
+            "| 2020-09-08 11:42:29.190 | 2020-09-08 11:42:29 | true                     |",
+            "+-------------------------+---------------------+--------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    {
+        let mut ctx = ExecutionContext::new();
+        let table_a = make_timestamp_table::<TimestampMillisecondType>()?;
+        let table_b = make_timestamp_table::<TimestampMicrosecondType>()?;
+        ctx.register_table("table_a", table_a)?;
+        ctx.register_table("table_b", table_b)?;
+
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+-------------------------+----------------------------+--------------------------+",
+            "| ts                      | ts                         | table_a.ts Eq table_b.ts |",
+            "+-------------------------+----------------------------+--------------------------+",
+            "| 2020-09-08 13:42:29.190 | 2020-09-08 13:42:29.190855 | true                     |",
+            "| 2020-09-08 13:42:29.190 | 2020-09-08 12:42:29.190855 | false                    |",
+            "| 2020-09-08 13:42:29.190 | 2020-09-08 11:42:29.190855 | false                    |",
+            "| 2020-09-08 12:42:29.190 | 2020-09-08 13:42:29.190855 | false                    |",
+            "| 2020-09-08 12:42:29.190 | 2020-09-08 12:42:29.190855 | true                     |",
+            "| 2020-09-08 12:42:29.190 | 2020-09-08 11:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29.190 | 2020-09-08 12:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29.190 | 2020-09-08 11:42:29.190855 | true                     |",
+            "+-------------------------+----------------------------+--------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    {
+        let mut ctx = ExecutionContext::new();
+        let table_a = make_timestamp_table::<TimestampMillisecondType>()?;
+        let table_b = make_timestamp_table::<TimestampNanosecondType>()?;
+        ctx.register_table("table_a", table_a)?;
+        ctx.register_table("table_b", table_b)?;
+
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+-------------------------+----------------------------+--------------------------+",
+            "| ts                      | ts                         | table_a.ts Eq table_b.ts |",
+            "+-------------------------+----------------------------+--------------------------+",
+            "| 2020-09-08 13:42:29.190 | 2020-09-08 13:42:29.190855 | true                     |",
+            "| 2020-09-08 13:42:29.190 | 2020-09-08 12:42:29.190855 | false                    |",
+            "| 2020-09-08 13:42:29.190 | 2020-09-08 11:42:29.190855 | false                    |",
+            "| 2020-09-08 12:42:29.190 | 2020-09-08 13:42:29.190855 | false                    |",
+            "| 2020-09-08 12:42:29.190 | 2020-09-08 12:42:29.190855 | true                     |",
+            "| 2020-09-08 12:42:29.190 | 2020-09-08 11:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29.190 | 2020-09-08 12:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29.190 | 2020-09-08 11:42:29.190855 | true                     |",
+            "+-------------------------+----------------------------+--------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    {
+        let mut ctx = ExecutionContext::new();
+        let table_a = make_timestamp_table::<TimestampMicrosecondType>()?;
+        let table_b = make_timestamp_table::<TimestampSecondType>()?;
+        ctx.register_table("table_a", table_a)?;
+        ctx.register_table("table_b", table_b)?;
+
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+----------------------------+---------------------+--------------------------+",
+            "| ts                         | ts                  | table_a.ts Eq table_b.ts |",
+            "+----------------------------+---------------------+--------------------------+",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29 | true                     |",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29 | false                    |",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29 | false                    |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29 | false                    |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29 | true                     |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29 | true                     |",
+            "+----------------------------+---------------------+--------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    {
+        let mut ctx = ExecutionContext::new();
+        let table_a = make_timestamp_table::<TimestampMicrosecondType>()?;
+        let table_b = make_timestamp_table::<TimestampMillisecondType>()?;
+        ctx.register_table("table_a", table_a)?;
+        ctx.register_table("table_b", table_b)?;
+
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+----------------------------+-------------------------+--------------------------+",
+            "| ts                         | ts                      | table_a.ts Eq table_b.ts |",
+            "+----------------------------+-------------------------+--------------------------+",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190 | true                     |",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190 | false                    |",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190 | false                    |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190 | false                    |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190 | true                     |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190 | true                     |",
+            "+----------------------------+-------------------------+--------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    {
+        let mut ctx = ExecutionContext::new();
+        let table_a = make_timestamp_table::<TimestampMicrosecondType>()?;
+        let table_b = make_timestamp_table::<TimestampNanosecondType>()?;
+        ctx.register_table("table_a", table_a)?;
+        ctx.register_table("table_b", table_b)?;
+
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+----------------------------+----------------------------+--------------------------+",
+            "| ts                         | ts                         | table_a.ts Eq table_b.ts |",
+            "+----------------------------+----------------------------+--------------------------+",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190855 | true                     |",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190855 | false                    |",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190855 | false                    |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190855 | false                    |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190855 | true                     |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190855 | true                     |",
+            "+----------------------------+----------------------------+--------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    {
+        let mut ctx = ExecutionContext::new();
+        let table_a = make_timestamp_table::<TimestampNanosecondType>()?;
+        let table_b = make_timestamp_table::<TimestampSecondType>()?;
+        ctx.register_table("table_a", table_a)?;
+        ctx.register_table("table_b", table_b)?;
+
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+----------------------------+---------------------+--------------------------+",
+            "| ts                         | ts                  | table_a.ts Eq table_b.ts |",
+            "+----------------------------+---------------------+--------------------------+",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29 | true                     |",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29 | false                    |",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29 | false                    |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29 | false                    |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29 | true                     |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29 | true                     |",
+            "+----------------------------+---------------------+--------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    {
+        let mut ctx = ExecutionContext::new();
+        let table_a = make_timestamp_table::<TimestampNanosecondType>()?;
+        let table_b = make_timestamp_table::<TimestampMillisecondType>()?;
+        ctx.register_table("table_a", table_a)?;
+        ctx.register_table("table_b", table_b)?;
+
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+----------------------------+-------------------------+--------------------------+",
+            "| ts                         | ts                      | table_a.ts Eq table_b.ts |",
+            "+----------------------------+-------------------------+--------------------------+",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190 | true                     |",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190 | false                    |",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190 | false                    |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190 | false                    |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190 | true                     |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190 | true                     |",
+            "+----------------------------+-------------------------+--------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    {
+        let mut ctx = ExecutionContext::new();
+        let table_a = make_timestamp_table::<TimestampNanosecondType>()?;
+        let table_b = make_timestamp_table::<TimestampMicrosecondType>()?;
+        ctx.register_table("table_a", table_a)?;
+        ctx.register_table("table_b", table_b)?;
+
+        let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+----------------------------+----------------------------+--------------------------+",
+            "| ts                         | ts                         | table_a.ts Eq table_b.ts |",
+            "+----------------------------+----------------------------+--------------------------+",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190855 | true                     |",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190855 | false                    |",
+            "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190855 | false                    |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190855 | false                    |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190855 | true                     |",
+            "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190855 | false                    |",
+            "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190855 | true                     |",
+            "+----------------------------+----------------------------+--------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    Ok(())
+}