You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/06/19 11:06:55 UTC

[arrow-datafusion] branch master updated: `to_timestamp_millis()`, `to_timestamp_micros()`, `to_timestamp_seconds()` (#567)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 5900b4c  `to_timestamp_millis()`, `to_timestamp_micros()`, `to_timestamp_seconds()` (#567)
5900b4c is described below

commit 5900b4c6829b0bdbe69e1f95fb74e935bc8f33d4
Author: Evan Chan <ve...@gmail.com>
AuthorDate: Sat Jun 19 04:06:47 2021 -0700

    `to_timestamp_millis()`, `to_timestamp_micros()`, `to_timestamp_seconds()` (#567)
    
    * to_timestamp_millis(): support casting to Timestamp(Milliseconds, _) from Int64
    
    * Add testing setup to instructions
    
    * to_timestamp_millis(): Convert timestamp strings to TimestampMillis
    
    * [functions] Add to_timestamp_micros() and to_timestamp_seconds() functions
    
    * Update datafusion/tests/sql.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * CR feedback and fix build
    
    * Add ability for to_timestamp_xxx() functions to cast from other Timestamp types
    
    * For consistency, let to_timestamp() also perform casts
    
    * Prettier / clippy
    
    * Add docs for to_timestamp() functions
    
    Co-authored-by: Evan Chan <ev...@urbanlogiq.com>
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 DEVELOPERS.md                                      |   9 +-
 README.md                                          |  54 +++--
 datafusion/Cargo.toml                              |   4 +-
 .../src/physical_plan/datetime_expressions.rs      |  32 ++-
 datafusion/src/physical_plan/expressions/binary.rs |  19 ++
 datafusion/src/physical_plan/expressions/cast.rs   |  38 +--
 datafusion/src/physical_plan/expressions/mod.rs    |   4 +-
 datafusion/src/physical_plan/expressions/nullif.rs |   6 +-
 datafusion/src/physical_plan/functions.rs          | 161 ++++++++++++-
 datafusion/src/scalar.rs                           |   5 +-
 datafusion/tests/sql.rs                            | 267 ++++++++++++++++++++-
 docs/user-guide/src/SUMMARY.md                     |   1 +
 docs/user-guide/src/sql/datafusion-functions.md    |  86 +++++++
 13 files changed, 618 insertions(+), 68 deletions(-)

diff --git a/DEVELOPERS.md b/DEVELOPERS.md
index cd0792f..8538468 100644
--- a/DEVELOPERS.md
+++ b/DEVELOPERS.md
@@ -33,6 +33,13 @@ DataFusion is written in Rust and it uses a standard rust toolkit:
 - `cargo test` to test
 - etc.
 
+Testing setup:
+
+- `git submodule init`
+- `git submodule update`
+- `export PARQUET_TEST_DATA=parquet_testing/`
+- `export ARROW_TEST_DATA=testing/data/`
+
 ## How to add a new scalar function
 
 Below is a checklist of what you need to do to add a new scalar function to DataFusion:
@@ -47,7 +54,7 @@ Below is a checklist of what you need to do to add a new scalar function to Data
   - a new entry to `FromStr` with the name of the function as called by SQL
   - a new line in `return_type` with the expected return type of the function, given an incoming type
   - a new line in `signature` with the signature of the function (number and types of its arguments)
-  - a new line in `create_physical_expr` mapping the built-in to the implementation
+  - a new line in `create_physical_expr`/`create_physical_fun` mapping the built-in to the implementation
   - tests to the function.
 - In [tests/sql.rs](datafusion/tests/sql.rs), add a new test where the function is called through SQL against well known data and returns the expected result.
 - In [src/logical_plan/expr](datafusion/src/logical_plan/expr.rs), add:
diff --git a/README.md b/README.md
index 730bbc3..195d1a7 100644
--- a/README.md
+++ b/README.md
@@ -197,6 +197,10 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
   - [ ] Basic date functions
   - [ ] Basic time functions
   - [x] Basic timestamp functions
+    - [x] [to_timestamp](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp)
+    - [x] [to_timestamp_millis](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp_millis)
+    - [x] [to_timestamp_micros](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp_micros)
+    - [x] [to_timestamp_seconds](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp_seconds)
 - nested functions
   - [x] Array of columns
 - [x] Schema Queries
@@ -320,31 +324,31 @@ execution. The SQL types from
 [sqlparser-rs](https://github.com/ballista-compute/sqlparser-rs/blob/main/src/ast/data_type.rs#L57)
 are mapped to Arrow types according to the following table
 
-| SQL Data Type | Arrow DataType                  |
-| ------------- | ------------------------------- |
-| `CHAR`        | `Utf8`                          |
-| `VARCHAR`     | `Utf8`                          |
-| `UUID`        | _Not yet supported_             |
-| `CLOB`        | _Not yet supported_             |
-| `BINARY`      | _Not yet supported_             |
-| `VARBINARY`   | _Not yet supported_             |
-| `DECIMAL`     | `Float64`                       |
-| `FLOAT`       | `Float32`                       |
-| `SMALLINT`    | `Int16`                         |
-| `INT`         | `Int32`                         |
-| `BIGINT`      | `Int64`                         |
-| `REAL`        | `Float64`                       |
-| `DOUBLE`      | `Float64`                       |
-| `BOOLEAN`     | `Boolean`                       |
-| `DATE`        | `Date32`                        |
-| `TIME`        | `Time64(TimeUnit::Millisecond)` |
-| `TIMESTAMP`   | `Date64`                        |
-| `INTERVAL`    | _Not yet supported_             |
-| `REGCLASS`    | _Not yet supported_             |
-| `TEXT`        | _Not yet supported_             |
-| `BYTEA`       | _Not yet supported_             |
-| `CUSTOM`      | _Not yet supported_             |
-| `ARRAY`       | _Not yet supported_             |
+| SQL Data Type | Arrow DataType                    |
+| ------------- | --------------------------------- |
+| `CHAR`        | `Utf8`                            |
+| `VARCHAR`     | `Utf8`                            |
+| `UUID`        | _Not yet supported_               |
+| `CLOB`        | _Not yet supported_               |
+| `BINARY`      | _Not yet supported_               |
+| `VARBINARY`   | _Not yet supported_               |
+| `DECIMAL`     | `Float64`                         |
+| `FLOAT`       | `Float32`                         |
+| `SMALLINT`    | `Int16`                           |
+| `INT`         | `Int32`                           |
+| `BIGINT`      | `Int64`                           |
+| `REAL`        | `Float64`                         |
+| `DOUBLE`      | `Float64`                         |
+| `BOOLEAN`     | `Boolean`                         |
+| `DATE`        | `Date32`                          |
+| `TIME`        | `Time64(TimeUnit::Millisecond)`   |
+| `TIMESTAMP`   | `Timestamp(TimeUnit::Nanosecond)` |
+| `INTERVAL`    | _Not yet supported_               |
+| `REGCLASS`    | _Not yet supported_               |
+| `TEXT`        | _Not yet supported_               |
+| `BYTEA`       | _Not yet supported_               |
+| `CUSTOM`      | _Not yet supported_               |
+| `ARRAY`       | _Not yet supported_               |
 
 # Architecture Overview
 
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 0668ec0..5da2469 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -46,8 +46,8 @@ unicode_expressions = ["unicode-segmentation"]
 [dependencies]
 ahash = "0.7"
 hashbrown = "0.11"
-arrow = { version = "4.0", features = ["prettyprint"] }
-parquet = { version = "4.0", features = ["arrow"] }
+arrow = { version = "4.3", features = ["prettyprint"] }
+parquet = { version = "4.3", features = ["arrow"] }
 sqlparser = "0.9.0"
 paste = "^1.0"
 num_cpus = "1.13.0"
diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs
index ec52e6b..e17ded2 100644
--- a/datafusion/src/physical_plan/datetime_expressions.rs
+++ b/datafusion/src/physical_plan/datetime_expressions.rs
@@ -25,7 +25,10 @@ use crate::{
 };
 use arrow::{
     array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait},
-    datatypes::{ArrowPrimitiveType, DataType, TimestampNanosecondType},
+    datatypes::{
+        ArrowPrimitiveType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
+        TimestampNanosecondType, TimestampSecondType,
+    },
 };
 use arrow::{
     array::{
@@ -268,6 +271,33 @@ pub fn to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue> {
     )
 }
 
+/// to_timestamp_millis SQL function
+pub fn to_timestamp_millis(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    handle::<TimestampMillisecondType, _, TimestampMillisecondType>(
+        args,
+        |s| string_to_timestamp_nanos(s).map(|n| n / 1_000_000),
+        "to_timestamp_millis",
+    )
+}
+
+/// to_timestamp_micros SQL function
+pub fn to_timestamp_micros(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    handle::<TimestampMicrosecondType, _, TimestampMicrosecondType>(
+        args,
+        |s| string_to_timestamp_nanos(s).map(|n| n / 1_000),
+        "to_timestamp_micros",
+    )
+}
+
+/// to_timestamp_seconds SQL function
+pub fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    handle::<TimestampSecondType, _, TimestampSecondType>(
+        args,
+        |s| string_to_timestamp_nanos(s).map(|n| n / 1_000_000_000),
+        "to_timestamp_seconds",
+    )
+}
+
 /// Create an implementation of `now()` that always returns the
 /// specified timestamp.
 ///
diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs
index 5c2d9ce..5ed0c74 100644
--- a/datafusion/src/physical_plan/expressions/binary.rs
+++ b/datafusion/src/physical_plan/expressions/binary.rs
@@ -17,6 +17,7 @@
 
 use std::{any::Any, sync::Arc};
 
+use arrow::array::TimestampMillisecondArray;
 use arrow::array::*;
 use arrow::compute::kernels::arithmetic::{
     add, divide, divide_scalar, multiply, subtract,
@@ -256,6 +257,15 @@ macro_rules! binary_array_op_scalar {
             DataType::Timestamp(TimeUnit::Nanosecond, None) => {
                 compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
             }
+            DataType::Timestamp(TimeUnit::Microsecond, None) => {
+                compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Millisecond, None) => {
+                compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Second, None) => {
+                compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampSecondArray)
+            }
             DataType::Date32 => {
                 compute_op_scalar!($LEFT, $RIGHT, $OP, Date32Array)
             }
@@ -288,6 +298,15 @@ macro_rules! binary_array_op {
             DataType::Timestamp(TimeUnit::Nanosecond, None) => {
                 compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
             }
+            DataType::Timestamp(TimeUnit::Microsecond, None) => {
+                compute_op!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Millisecond, None) => {
+                compute_op!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Second, None) => {
+                compute_op!($LEFT, $RIGHT, $OP, TimestampSecondArray)
+            }
             DataType::Date32 => {
                 compute_op!($LEFT, $RIGHT, $OP, Date32Array)
             }
diff --git a/datafusion/src/physical_plan/expressions/cast.rs b/datafusion/src/physical_plan/expressions/cast.rs
index ba395f5..558b1e5 100644
--- a/datafusion/src/physical_plan/expressions/cast.rs
+++ b/datafusion/src/physical_plan/expressions/cast.rs
@@ -91,24 +91,26 @@ impl PhysicalExpr for CastExpr {
 
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
         let value = self.expr.evaluate(batch)?;
-        match value {
-            ColumnarValue::Array(array) => {
-                Ok(ColumnarValue::Array(kernels::cast::cast_with_options(
-                    &array,
-                    &self.cast_type,
-                    &self.cast_options,
-                )?))
-            }
-            ColumnarValue::Scalar(scalar) => {
-                let scalar_array = scalar.to_array();
-                let cast_array = kernels::cast::cast_with_options(
-                    &scalar_array,
-                    &self.cast_type,
-                    &self.cast_options,
-                )?;
-                let cast_scalar = ScalarValue::try_from_array(&cast_array, 0)?;
-                Ok(ColumnarValue::Scalar(cast_scalar))
-            }
+        cast_column(&value, &self.cast_type, &self.cast_options)
+    }
+}
+
+/// Internal cast function for casting ColumnarValue -> ColumnarValue for cast_type
+pub fn cast_column(
+    value: &ColumnarValue,
+    cast_type: &DataType,
+    cast_options: &CastOptions,
+) -> Result<ColumnarValue> {
+    match value {
+        ColumnarValue::Array(array) => Ok(ColumnarValue::Array(
+            kernels::cast::cast_with_options(array, cast_type, cast_options)?,
+        )),
+        ColumnarValue::Scalar(scalar) => {
+            let scalar_array = scalar.to_array();
+            let cast_array =
+                kernels::cast::cast_with_options(&scalar_array, cast_type, cast_options)?;
+            let cast_scalar = ScalarValue::try_from_array(&cast_array, 0)?;
+            Ok(ColumnarValue::Scalar(cast_scalar))
         }
     }
 }
diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs
index d18365c..f8cb40c 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -49,7 +49,9 @@ mod try_cast;
 pub use average::{avg_return_type, Avg, AvgAccumulator};
 pub use binary::{binary, binary_operator_data_type, BinaryExpr};
 pub use case::{case, CaseExpr};
-pub use cast::{cast, cast_with_options, CastExpr};
+pub use cast::{
+    cast, cast_column, cast_with_options, CastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
+};
 pub use column::{col, Column};
 pub use count::Count;
 pub use in_list::{in_list, InListExpr};
diff --git a/datafusion/src/physical_plan/expressions/nullif.rs b/datafusion/src/physical_plan/expressions/nullif.rs
index 7cc58ed..55e7bda 100644
--- a/datafusion/src/physical_plan/expressions/nullif.rs
+++ b/datafusion/src/physical_plan/expressions/nullif.rs
@@ -21,11 +21,7 @@ use super::ColumnarValue;
 use crate::error::{DataFusionError, Result};
 use crate::scalar::ScalarValue;
 use arrow::array::Array;
-use arrow::array::{
-    ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
-    Int16Array, Int32Array, Int64Array, Int8Array, StringArray, TimestampNanosecondArray,
-    UInt16Array, UInt32Array, UInt64Array, UInt8Array,
-};
+use arrow::array::*;
 use arrow::compute::kernels::boolean::nullif;
 use arrow::compute::kernels::comparison::{eq, eq_scalar, eq_utf8, eq_utf8_scalar};
 use arrow::datatypes::{DataType, TimeUnit};
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 1e423c3..0e2be51 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -36,7 +36,9 @@ use super::{
 use crate::execution::context::ExecutionContextState;
 use crate::physical_plan::array_expressions;
 use crate::physical_plan::datetime_expressions;
-use crate::physical_plan::expressions::{nullif_func, SUPPORTED_NULLIF_TYPES};
+use crate::physical_plan::expressions::{
+    cast_column, nullif_func, DEFAULT_DATAFUSION_CAST_OPTIONS, SUPPORTED_NULLIF_TYPES,
+};
 use crate::physical_plan::math_expressions;
 use crate::physical_plan::string_expressions;
 use crate::{
@@ -205,6 +207,12 @@ pub enum BuiltinScalarFunction {
     ToHex,
     /// to_timestamp
     ToTimestamp,
+    /// to_timestamp_millis
+    ToTimestampMillis,
+    /// to_timestamp_micros
+    ToTimestampMicros,
+    /// to_timestamp_seconds
+    ToTimestampSeconds,
     ///now
     Now,
     /// translate
@@ -298,6 +306,9 @@ impl FromStr for BuiltinScalarFunction {
             "substr" => BuiltinScalarFunction::Substr,
             "to_hex" => BuiltinScalarFunction::ToHex,
             "to_timestamp" => BuiltinScalarFunction::ToTimestamp,
+            "to_timestamp_millis" => BuiltinScalarFunction::ToTimestampMillis,
+            "to_timestamp_micros" => BuiltinScalarFunction::ToTimestampMicros,
+            "to_timestamp_seconds" => BuiltinScalarFunction::ToTimestampSeconds,
             "now" => BuiltinScalarFunction::Now,
             "translate" => BuiltinScalarFunction::Translate,
             "trim" => BuiltinScalarFunction::Trim,
@@ -412,6 +423,15 @@ pub fn return_type(
         BuiltinScalarFunction::ToTimestamp => {
             Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
         }
+        BuiltinScalarFunction::ToTimestampMillis => {
+            Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
+        }
+        BuiltinScalarFunction::ToTimestampMicros => {
+            Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
+        }
+        BuiltinScalarFunction::ToTimestampSeconds => {
+            Ok(DataType::Timestamp(TimeUnit::Second, None))
+        }
         BuiltinScalarFunction::Now => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
         BuiltinScalarFunction::Translate => utf8_to_str_type(&arg_types[0], "translate"),
         BuiltinScalarFunction::Trim => utf8_to_str_type(&arg_types[0], "trim"),
@@ -896,9 +916,6 @@ pub fn create_physical_fun(
                 other,
             ))),
         }),
-        BuiltinScalarFunction::ToTimestamp => {
-            Arc::new(datetime_expressions::to_timestamp)
-        }
         BuiltinScalarFunction::Translate => Arc::new(|args| match args[0].data_type() {
             DataType::Utf8 => {
                 let func = invoke_if_unicode_expressions_feature_flag!(
@@ -934,6 +951,12 @@ pub fn create_physical_fun(
             ))),
         }),
         BuiltinScalarFunction::Upper => Arc::new(string_expressions::upper),
+        _ => {
+            return Err(DataFusionError::Internal(format!(
+                "create_physical_fun: Unsupported scalar function {:?}",
+                fun
+            )))
+        }
     })
 }
 
@@ -945,7 +968,94 @@ pub fn create_physical_expr(
     input_schema: &Schema,
     ctx_state: &ExecutionContextState,
 ) -> Result<Arc<dyn PhysicalExpr>> {
-    let fun_expr = create_physical_fun(fun, ctx_state)?;
+    let fun_expr: ScalarFunctionImplementation = match fun {
+        // These functions need args and input schema to pick an implementation
+        // Unlike the string functions, which actually figure out the function to use with each array,
+        // here we return either a cast fn or string timestamp translation based on the expression data type
+        // so we don't have to pay a per-array/batch cost.
+        BuiltinScalarFunction::ToTimestamp => {
+            Arc::new(match args[0].data_type(input_schema) {
+                Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
+                    |col_values: &[ColumnarValue]| {
+                        cast_column(
+                            &col_values[0],
+                            &DataType::Timestamp(TimeUnit::Nanosecond, None),
+                            &DEFAULT_DATAFUSION_CAST_OPTIONS,
+                        )
+                    }
+                }
+                Ok(DataType::Utf8) => datetime_expressions::to_timestamp,
+                other => {
+                    return Err(DataFusionError::Internal(format!(
+                        "Unsupported data type {:?} for function to_timestamp",
+                        other,
+                    )))
+                }
+            })
+        }
+        BuiltinScalarFunction::ToTimestampMillis => {
+            Arc::new(match args[0].data_type(input_schema) {
+                Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
+                    |col_values: &[ColumnarValue]| {
+                        cast_column(
+                            &col_values[0],
+                            &DataType::Timestamp(TimeUnit::Millisecond, None),
+                            &DEFAULT_DATAFUSION_CAST_OPTIONS,
+                        )
+                    }
+                }
+                Ok(DataType::Utf8) => datetime_expressions::to_timestamp_millis,
+                other => {
+                    return Err(DataFusionError::Internal(format!(
+                        "Unsupported data type {:?} for function to_timestamp_millis",
+                        other,
+                    )))
+                }
+            })
+        }
+        BuiltinScalarFunction::ToTimestampMicros => {
+            Arc::new(match args[0].data_type(input_schema) {
+                Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
+                    |col_values: &[ColumnarValue]| {
+                        cast_column(
+                            &col_values[0],
+                            &DataType::Timestamp(TimeUnit::Microsecond, None),
+                            &DEFAULT_DATAFUSION_CAST_OPTIONS,
+                        )
+                    }
+                }
+                Ok(DataType::Utf8) => datetime_expressions::to_timestamp_micros,
+                other => {
+                    return Err(DataFusionError::Internal(format!(
+                        "Unsupported data type {:?} for function to_timestamp_micros",
+                        other,
+                    )))
+                }
+            })
+        }
+        BuiltinScalarFunction::ToTimestampSeconds => Arc::new({
+            match args[0].data_type(input_schema) {
+                Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
+                    |col_values: &[ColumnarValue]| {
+                        cast_column(
+                            &col_values[0],
+                            &DataType::Timestamp(TimeUnit::Second, None),
+                            &DEFAULT_DATAFUSION_CAST_OPTIONS,
+                        )
+                    }
+                }
+                Ok(DataType::Utf8) => datetime_expressions::to_timestamp_seconds,
+                other => {
+                    return Err(DataFusionError::Internal(format!(
+                        "Unsupported data type {:?} for function to_timestamp_seconds",
+                        other,
+                    )))
+                }
+            }
+        }),
+        // These don't need args and input schema
+        _ => create_physical_fun(fun, ctx_state)?,
+    };
     let args = coerce(args, input_schema, &signature(fun))?;
 
     let arg_types = args
@@ -1026,7 +1136,46 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature {
             Signature::Exact(vec![DataType::Utf8, DataType::Int64]),
             Signature::Exact(vec![DataType::LargeUtf8, DataType::Int64]),
         ]),
-        BuiltinScalarFunction::ToTimestamp => Signature::Uniform(1, vec![DataType::Utf8]),
+        BuiltinScalarFunction::ToTimestamp => Signature::Uniform(
+            1,
+            vec![
+                DataType::Utf8,
+                DataType::Int64,
+                DataType::Timestamp(TimeUnit::Millisecond, None),
+                DataType::Timestamp(TimeUnit::Microsecond, None),
+                DataType::Timestamp(TimeUnit::Second, None),
+            ],
+        ),
+        BuiltinScalarFunction::ToTimestampMillis => Signature::Uniform(
+            1,
+            vec![
+                DataType::Utf8,
+                DataType::Int64,
+                DataType::Timestamp(TimeUnit::Nanosecond, None),
+                DataType::Timestamp(TimeUnit::Microsecond, None),
+                DataType::Timestamp(TimeUnit::Second, None),
+            ],
+        ),
+        BuiltinScalarFunction::ToTimestampMicros => Signature::Uniform(
+            1,
+            vec![
+                DataType::Utf8,
+                DataType::Int64,
+                DataType::Timestamp(TimeUnit::Nanosecond, None),
+                DataType::Timestamp(TimeUnit::Millisecond, None),
+                DataType::Timestamp(TimeUnit::Second, None),
+            ],
+        ),
+        BuiltinScalarFunction::ToTimestampSeconds => Signature::Uniform(
+            1,
+            vec![
+                DataType::Utf8,
+                DataType::Int64,
+                DataType::Timestamp(TimeUnit::Nanosecond, None),
+                DataType::Timestamp(TimeUnit::Microsecond, None),
+                DataType::Timestamp(TimeUnit::Millisecond, None),
+            ],
+        ),
         BuiltinScalarFunction::DateTrunc => Signature::Exact(vec![
             DataType::Utf8,
             DataType::Timestamp(TimeUnit::Nanosecond, None),
diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs
index c7afbf5..c23674b 100644
--- a/datafusion/src/scalar.rs
+++ b/datafusion/src/scalar.rs
@@ -900,7 +900,10 @@ impl TryFrom<ScalarValue> for i64 {
     fn try_from(value: ScalarValue) -> Result<Self> {
         match value {
             ScalarValue::Int64(Some(inner_value))
-            | ScalarValue::TimestampNanosecond(Some(inner_value)) => Ok(inner_value),
+            | ScalarValue::TimestampNanosecond(Some(inner_value))
+            | ScalarValue::TimestampMicrosecond(Some(inner_value))
+            | ScalarValue::TimestampMillisecond(Some(inner_value))
+            | ScalarValue::TimestampSecond(Some(inner_value)) => Ok(inner_value),
             _ => Err(DataFusionError::Internal(format!(
                 "Cannot convert {:?} to {}",
                 value,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 21da793..b6393e9 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -27,7 +27,11 @@ extern crate datafusion;
 use arrow::{array::*, datatypes::TimeUnit};
 use arrow::{datatypes::Int32Type, datatypes::Int64Type, record_batch::RecordBatch};
 use arrow::{
-    datatypes::{DataType, Field, Schema, SchemaRef},
+    datatypes::{
+        ArrowNativeType, ArrowPrimitiveType, ArrowTimestampType, DataType, Field, Schema,
+        SchemaRef, TimestampMicrosecondType, TimestampMillisecondType,
+        TimestampNanosecondType, TimestampSecondType,
+    },
     util::display::array_value_to_string,
 };
 
@@ -1024,6 +1028,188 @@ async fn csv_query_cast_literal() -> Result<()> {
 }
 
 #[tokio::test]
+async fn query_cast_timestamp_millis() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+
+    let t1_schema = Arc::new(Schema::new(vec![Field::new("ts", DataType::Int64, true)]));
+    let t1_data = RecordBatch::try_new(
+        t1_schema.clone(),
+        vec![Arc::new(Int64Array::from(vec![
+            1235865600000,
+            1235865660000,
+            1238544000000,
+        ]))],
+    )?;
+    let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?;
+    ctx.register_table("t1", Arc::new(t1_table))?;
+
+    let sql = "SELECT to_timestamp_millis(ts) FROM t1 LIMIT 3";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![
+        vec!["2009-03-01 00:00:00"],
+        vec!["2009-03-01 00:01:00"],
+        vec!["2009-04-01 00:00:00"],
+    ];
+    assert_eq!(expected, actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn query_cast_timestamp_micros() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+
+    let t1_schema = Arc::new(Schema::new(vec![Field::new("ts", DataType::Int64, true)]));
+    let t1_data = RecordBatch::try_new(
+        t1_schema.clone(),
+        vec![Arc::new(Int64Array::from(vec![
+            1235865600000000,
+            1235865660000000,
+            1238544000000000,
+        ]))],
+    )?;
+    let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?;
+    ctx.register_table("t1", Arc::new(t1_table))?;
+
+    let sql = "SELECT to_timestamp_micros(ts) FROM t1 LIMIT 3";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![
+        vec!["2009-03-01 00:00:00"],
+        vec!["2009-03-01 00:01:00"],
+        vec!["2009-04-01 00:00:00"],
+    ];
+    assert_eq!(expected, actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn query_cast_timestamp_seconds() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+
+    let t1_schema = Arc::new(Schema::new(vec![Field::new("ts", DataType::Int64, true)]));
+    let t1_data = RecordBatch::try_new(
+        t1_schema.clone(),
+        vec![Arc::new(Int64Array::from(vec![
+            1235865600, 1235865660, 1238544000,
+        ]))],
+    )?;
+    let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?;
+    ctx.register_table("t1", Arc::new(t1_table))?;
+
+    let sql = "SELECT to_timestamp_seconds(ts) FROM t1 LIMIT 3";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![
+        vec!["2009-03-01 00:00:00"],
+        vec!["2009-03-01 00:01:00"],
+        vec!["2009-04-01 00:00:00"],
+    ];
+    assert_eq!(expected, actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn query_cast_timestamp_nanos_to_others() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("ts_data", make_timestamp_nano_table()?)?;
+
+    // Original column is nanos, convert to millis and check timestamp
+    let sql = "SELECT to_timestamp_millis(ts) FROM ts_data LIMIT 3";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![
+        vec!["2020-09-08 13:42:29.190"],
+        vec!["2020-09-08 12:42:29.190"],
+        vec!["2020-09-08 11:42:29.190"],
+    ];
+    assert_eq!(expected, actual);
+
+    let sql = "SELECT to_timestamp_micros(ts) FROM ts_data LIMIT 3";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![
+        vec!["2020-09-08 13:42:29.190855"],
+        vec!["2020-09-08 12:42:29.190855"],
+        vec!["2020-09-08 11:42:29.190855"],
+    ];
+    assert_eq!(expected, actual);
+
+    let sql = "SELECT to_timestamp_seconds(ts) FROM ts_data LIMIT 3";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![
+        vec!["2020-09-08 13:42:29"],
+        vec!["2020-09-08 12:42:29"],
+        vec!["2020-09-08 11:42:29"],
+    ];
+    assert_eq!(expected, actual);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn query_cast_timestamp_seconds_to_others() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("ts_secs", make_timestamp_table::<TimestampSecondType>()?)?;
+
+    // Original column is seconds, convert to millis and check timestamp
+    let sql = "SELECT to_timestamp_millis(ts) FROM ts_secs LIMIT 3";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![
+        vec!["2020-09-08 13:42:29"],
+        vec!["2020-09-08 12:42:29"],
+        vec!["2020-09-08 11:42:29"],
+    ];
+    assert_eq!(expected, actual);
+
+    // Original column is seconds, convert to micros and check timestamp
+    let sql = "SELECT to_timestamp_micros(ts) FROM ts_secs LIMIT 3";
+    let actual = execute(&mut ctx, sql).await;
+    assert_eq!(expected, actual);
+
+    // to nanos
+    let sql = "SELECT to_timestamp(ts) FROM ts_secs LIMIT 3";
+    let actual = execute(&mut ctx, sql).await;
+    assert_eq!(expected, actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn query_cast_timestamp_micros_to_others() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table(
+        "ts_micros",
+        make_timestamp_table::<TimestampMicrosecondType>()?,
+    )?;
+
+    // Original column is micros, convert to millis and check timestamp
+    let sql = "SELECT to_timestamp_millis(ts) FROM ts_micros LIMIT 3";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![
+        vec!["2020-09-08 13:42:29.190"],
+        vec!["2020-09-08 12:42:29.190"],
+        vec!["2020-09-08 11:42:29.190"],
+    ];
+    assert_eq!(expected, actual);
+
+    // Original column is micros, convert to seconds and check timestamp
+    let sql = "SELECT to_timestamp_seconds(ts) FROM ts_micros LIMIT 3";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![
+        vec!["2020-09-08 13:42:29"],
+        vec!["2020-09-08 12:42:29"],
+        vec!["2020-09-08 11:42:29"],
+    ];
+    assert_eq!(expected, actual);
+
+    // Original column is micros, convert to nanos and check timestamp
+    let sql = "SELECT to_timestamp(ts) FROM ts_micros LIMIT 3";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![
+        vec!["2020-09-08 13:42:29.190855"],
+        vec!["2020-09-08 12:42:29.190855"],
+        vec!["2020-09-08 11:42:29.190855"],
+    ];
+    assert_eq!(expected, actual);
+    Ok(())
+}
+
+#[tokio::test]
 async fn union_all() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     let sql = "SELECT 1 as x UNION ALL SELECT 2 as x";
@@ -2439,17 +2625,33 @@ async fn like() -> Result<()> {
     Ok(())
 }
 
-fn make_timestamp_nano_table() -> Result<Arc<MemTable>> {
+fn make_timestamp_table<A>() -> Result<Arc<MemTable>>
+where
+    A: ArrowTimestampType,
+{
     let schema = Arc::new(Schema::new(vec![
-        Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
+        Field::new("ts", DataType::Timestamp(A::get_time_unit(), None), false),
         Field::new("value", DataType::Int32, true),
     ]));
 
-    let mut builder = TimestampNanosecondArray::builder(3);
-
-    builder.append_value(1599572549190855000)?; // 2020-09-08T13:42:29.190855+00:00
-    builder.append_value(1599568949190855000)?; // 2020-09-08T12:42:29.190855+00:00
-    builder.append_value(1599565349190855000)?; // 2020-09-08T11:42:29.190855+00:00
+    let mut builder = PrimitiveBuilder::<A>::new(3);
+
+    let nanotimestamps = vec![
+        1599572549190855000i64, // 2020-09-08T13:42:29.190855+00:00
+        1599568949190855000,    // 2020-09-08T12:42:29.190855+00:00
+        1599565349190855000,    //2020-09-08T11:42:29.190855+00:00
+    ]; // 2020-09-08T11:42:29.190855+00:00
+    let divisor = match A::get_time_unit() {
+        TimeUnit::Nanosecond => 1,
+        TimeUnit::Microsecond => 1000,
+        TimeUnit::Millisecond => 1_000_000,
+        TimeUnit::Second => 1_000_000_000,
+    };
+    for ts in nanotimestamps {
+        builder.append_value(
+            <A as ArrowPrimitiveType>::Native::from_i64(ts / divisor).unwrap(),
+        )?;
+    }
 
     let data = RecordBatch::try_new(
         schema.clone(),
@@ -2462,6 +2664,10 @@ fn make_timestamp_nano_table() -> Result<Arc<MemTable>> {
     Ok(Arc::new(table))
 }
 
+fn make_timestamp_nano_table() -> Result<Arc<MemTable>> {
+    make_timestamp_table::<TimestampNanosecondType>()
+}
+
 #[tokio::test]
 async fn to_timestamp() -> Result<()> {
     let mut ctx = ExecutionContext::new();
@@ -2476,6 +2682,51 @@ async fn to_timestamp() -> Result<()> {
 }
 
 #[tokio::test]
+async fn to_timestamp_millis() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table(
+        "ts_data",
+        make_timestamp_table::<TimestampMillisecondType>()?,
+    )?;
+
+    let sql = "SELECT COUNT(*) FROM ts_data where ts > to_timestamp_millis('2020-09-08T12:00:00+00:00')";
+    let actual = execute(&mut ctx, sql).await;
+
+    let expected = vec![vec!["2"]];
+    assert_eq!(expected, actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn to_timestamp_micros() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table(
+        "ts_data",
+        make_timestamp_table::<TimestampMicrosecondType>()?,
+    )?;
+
+    let sql = "SELECT COUNT(*) FROM ts_data where ts > to_timestamp_micros('2020-09-08T12:00:00+00:00')";
+    let actual = execute(&mut ctx, sql).await;
+
+    let expected = vec![vec!["2"]];
+    assert_eq!(expected, actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn to_timestamp_seconds() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("ts_data", make_timestamp_table::<TimestampSecondType>()?)?;
+
+    let sql = "SELECT COUNT(*) FROM ts_data where ts > to_timestamp_seconds('2020-09-08T12:00:00+00:00')";
+    let actual = execute(&mut ctx, sql).await;
+
+    let expected = vec![vec!["2"]];
+    assert_eq!(expected, actual);
+    Ok(())
+}
+
+#[tokio::test]
 async fn count_distinct_timestamps() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     ctx.register_table("ts_data", make_timestamp_nano_table()?)?;
diff --git a/docs/user-guide/src/SUMMARY.md b/docs/user-guide/src/SUMMARY.md
index aa101b3..516fcce 100644
--- a/docs/user-guide/src/SUMMARY.md
+++ b/docs/user-guide/src/SUMMARY.md
@@ -27,6 +27,7 @@
   - [SELECT](sql/select.md)
   - [DDL](sql/ddl.md)
     - [CREATE EXTERNAL TABLE](sql/ddl.md)
+  - [Datafusion Specific Functions](sql/datafusion-functions.md)
 
 - [Distributed](distributed/introduction.md)
   - [Create a Ballista Cluster](distributed/deployment.md)
diff --git a/docs/user-guide/src/sql/datafusion-functions.md b/docs/user-guide/src/sql/datafusion-functions.md
new file mode 100644
index 0000000..8431baf
--- /dev/null
+++ b/docs/user-guide/src/sql/datafusion-functions.md
@@ -0,0 +1,86 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Datafusion-Specific Functions
+
+These SQL functions are specific to DataFusion, or they are well known and have functionality which is specific to DataFusion. Specifically, the `to_timestamp_xx()` functions exist due to Arrow's support for multiple timestamp resolutions.
+
+## `to_timestamp`
+
+`to_timestamp()` is similar to the standard SQL function. It performs conversions to type `Timestamp(Nanoseconds, None)`, from:
+
+- Timestamp strings
+  - `1997-01-31T09:26:56.123Z` # RCF3339
+  - `1997-01-31T09:26:56.123-05:00` # RCF3339
+  - `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space er than T
+  - `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone et specified
+  - `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and timezone offset
+  - `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds
+- An Int64 array/column, values are nanoseconds since Epoch UTC
+- Other Timestamp() columns or values
+
+Note that conversions from other Timestamp and Int64 types can also be performed using `CAST(.. AS Timestamp)`. However, the conversion functionality here is present for consistency with the other `to_timestamp_xx()` functions.
+
+## `to_timestamp_millis`
+
+`to_timestamp_millis()` does conversions to type `Timestamp(Milliseconds, None)`, from:
+
+- Timestamp strings, the same as supported by the regular timestamp() function (except the output is a timestamp of Milliseconds resolution)
+  - `1997-01-31T09:26:56.123Z` # RCF3339
+  - `1997-01-31T09:26:56.123-05:00` # RCF3339
+  - `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space er than T
+  - `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone et specified
+  - `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and timezone offset
+  - `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds
+- An Int64 array/column, values are milliseconds since Epoch UTC
+- Other Timestamp() columns or values
+
+Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolution; this function is the only way to convert/cast to millisecond resolution.
+
+## `to_timestamp_micros`
+
+`to_timestamp_micros()` does conversions to type `Timestamp(Microseconds, None)`, from:
+
+- Timestamp strings, the same as supported by the regular timestamp() function (except the output is a timestamp of microseconds resolution)
+  - `1997-01-31T09:26:56.123Z` # RCF3339
+  - `1997-01-31T09:26:56.123-05:00` # RCF3339
+  - `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space er than T
+  - `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone et specified
+  - `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and timezone offset
+  - `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds
+- An Int64 array/column, values are microseconds since Epoch UTC
+- Other Timestamp() columns or values
+
+Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolution; this function is the only way to convert/cast to microsecond resolution.
+
+## `to_timestamp_seconds`
+
+`to_timestamp_seconds()` does conversions to type `Timestamp(Seconds, None)`, from:
+
+- Timestamp strings, the same as supported by the regular timestamp() function (except the output is a timestamp of secondseconds resolution)
+  - `1997-01-31T09:26:56.123Z` # RCF3339
+  - `1997-01-31T09:26:56.123-05:00` # RCF3339
+  - `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space er than T
+  - `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone et specified
+  - `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and timezone offset
+  - `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds
+- An Int64 array/column, values are seconds since Epoch UTC
+- Other Timestamp() columns or values
+
+Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolution; this function is the only way to convert/cast to seconds resolution.