You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/06/19 11:06:55 UTC
[arrow-datafusion] branch master updated: `to_timestamp_millis()`,
`to_timestamp_micros()`, `to_timestamp_seconds()` (#567)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5900b4c `to_timestamp_millis()`, `to_timestamp_micros()`, `to_timestamp_seconds()` (#567)
5900b4c is described below
commit 5900b4c6829b0bdbe69e1f95fb74e935bc8f33d4
Author: Evan Chan <ve...@gmail.com>
AuthorDate: Sat Jun 19 04:06:47 2021 -0700
`to_timestamp_millis()`, `to_timestamp_micros()`, `to_timestamp_seconds()` (#567)
* to_timestamp_millis(): support casting to Timestamp(Milliseconds, _) from Int64
* Add testing setup to instructions
* to_timestamp_millis(): Convert timestamp strings to TimestampMillis
* [functions] Add to_timestamp_micros() and to_timestamp_seconds() functions
* Update datafusion/tests/sql.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* CR feedback and fix build
* Add ability for to_timestamp_xxx() functions to cast from other Timestamp types
* For consistency, let to_timestamp() also perform casts
* Prettier / clippy
* Add docs for to_timestamp() functions
Co-authored-by: Evan Chan <ev...@urbanlogiq.com>
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
DEVELOPERS.md | 9 +-
README.md | 54 +++--
datafusion/Cargo.toml | 4 +-
.../src/physical_plan/datetime_expressions.rs | 32 ++-
datafusion/src/physical_plan/expressions/binary.rs | 19 ++
datafusion/src/physical_plan/expressions/cast.rs | 38 +--
datafusion/src/physical_plan/expressions/mod.rs | 4 +-
datafusion/src/physical_plan/expressions/nullif.rs | 6 +-
datafusion/src/physical_plan/functions.rs | 161 ++++++++++++-
datafusion/src/scalar.rs | 5 +-
datafusion/tests/sql.rs | 267 ++++++++++++++++++++-
docs/user-guide/src/SUMMARY.md | 1 +
docs/user-guide/src/sql/datafusion-functions.md | 86 +++++++
13 files changed, 618 insertions(+), 68 deletions(-)
diff --git a/DEVELOPERS.md b/DEVELOPERS.md
index cd0792f..8538468 100644
--- a/DEVELOPERS.md
+++ b/DEVELOPERS.md
@@ -33,6 +33,13 @@ DataFusion is written in Rust and it uses a standard rust toolkit:
- `cargo test` to test
- etc.
+Testing setup:
+
+- `git submodule init`
+- `git submodule update`
+- `export PARQUET_TEST_DATA=parquet_testing/`
+- `export ARROW_TEST_DATA=testing/data/`
+
## How to add a new scalar function
Below is a checklist of what you need to do to add a new scalar function to DataFusion:
@@ -47,7 +54,7 @@ Below is a checklist of what you need to do to add a new scalar function to Data
- a new entry to `FromStr` with the name of the function as called by SQL
- a new line in `return_type` with the expected return type of the function, given an incoming type
- a new line in `signature` with the signature of the function (number and types of its arguments)
- - a new line in `create_physical_expr` mapping the built-in to the implementation
+ - a new line in `create_physical_expr`/`create_physical_fun` mapping the built-in to the implementation
- tests to the function.
- In [tests/sql.rs](datafusion/tests/sql.rs), add a new test where the function is called through SQL against well known data and returns the expected result.
- In [src/logical_plan/expr](datafusion/src/logical_plan/expr.rs), add:
diff --git a/README.md b/README.md
index 730bbc3..195d1a7 100644
--- a/README.md
+++ b/README.md
@@ -197,6 +197,10 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
- [ ] Basic date functions
- [ ] Basic time functions
- [x] Basic timestamp functions
+ - [x] [to_timestamp](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp)
+ - [x] [to_timestamp_millis](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp_millis)
+ - [x] [to_timestamp_micros](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp_micros)
+ - [x] [to_timestamp_seconds](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp_seconds)
- nested functions
- [x] Array of columns
- [x] Schema Queries
@@ -320,31 +324,31 @@ execution. The SQL types from
[sqlparser-rs](https://github.com/ballista-compute/sqlparser-rs/blob/main/src/ast/data_type.rs#L57)
are mapped to Arrow types according to the following table
-| SQL Data Type | Arrow DataType |
-| ------------- | ------------------------------- |
-| `CHAR` | `Utf8` |
-| `VARCHAR` | `Utf8` |
-| `UUID` | _Not yet supported_ |
-| `CLOB` | _Not yet supported_ |
-| `BINARY` | _Not yet supported_ |
-| `VARBINARY` | _Not yet supported_ |
-| `DECIMAL` | `Float64` |
-| `FLOAT` | `Float32` |
-| `SMALLINT` | `Int16` |
-| `INT` | `Int32` |
-| `BIGINT` | `Int64` |
-| `REAL` | `Float64` |
-| `DOUBLE` | `Float64` |
-| `BOOLEAN` | `Boolean` |
-| `DATE` | `Date32` |
-| `TIME` | `Time64(TimeUnit::Millisecond)` |
-| `TIMESTAMP` | `Date64` |
-| `INTERVAL` | _Not yet supported_ |
-| `REGCLASS` | _Not yet supported_ |
-| `TEXT` | _Not yet supported_ |
-| `BYTEA` | _Not yet supported_ |
-| `CUSTOM` | _Not yet supported_ |
-| `ARRAY` | _Not yet supported_ |
+| SQL Data Type | Arrow DataType |
+| ------------- | --------------------------------- |
+| `CHAR` | `Utf8` |
+| `VARCHAR` | `Utf8` |
+| `UUID` | _Not yet supported_ |
+| `CLOB` | _Not yet supported_ |
+| `BINARY` | _Not yet supported_ |
+| `VARBINARY` | _Not yet supported_ |
+| `DECIMAL` | `Float64` |
+| `FLOAT` | `Float32` |
+| `SMALLINT` | `Int16` |
+| `INT` | `Int32` |
+| `BIGINT` | `Int64` |
+| `REAL` | `Float64` |
+| `DOUBLE` | `Float64` |
+| `BOOLEAN` | `Boolean` |
+| `DATE` | `Date32` |
+| `TIME` | `Time64(TimeUnit::Millisecond)` |
+| `TIMESTAMP` | `Timestamp(TimeUnit::Nanosecond)` |
+| `INTERVAL` | _Not yet supported_ |
+| `REGCLASS` | _Not yet supported_ |
+| `TEXT` | _Not yet supported_ |
+| `BYTEA` | _Not yet supported_ |
+| `CUSTOM` | _Not yet supported_ |
+| `ARRAY` | _Not yet supported_ |
# Architecture Overview
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 0668ec0..5da2469 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -46,8 +46,8 @@ unicode_expressions = ["unicode-segmentation"]
[dependencies]
ahash = "0.7"
hashbrown = "0.11"
-arrow = { version = "4.0", features = ["prettyprint"] }
-parquet = { version = "4.0", features = ["arrow"] }
+arrow = { version = "4.3", features = ["prettyprint"] }
+parquet = { version = "4.3", features = ["arrow"] }
sqlparser = "0.9.0"
paste = "^1.0"
num_cpus = "1.13.0"
diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs
index ec52e6b..e17ded2 100644
--- a/datafusion/src/physical_plan/datetime_expressions.rs
+++ b/datafusion/src/physical_plan/datetime_expressions.rs
@@ -25,7 +25,10 @@ use crate::{
};
use arrow::{
array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait},
- datatypes::{ArrowPrimitiveType, DataType, TimestampNanosecondType},
+ datatypes::{
+ ArrowPrimitiveType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
+ TimestampNanosecondType, TimestampSecondType,
+ },
};
use arrow::{
array::{
@@ -268,6 +271,33 @@ pub fn to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue> {
)
}
+/// to_timestamp_millis SQL function
+pub fn to_timestamp_millis(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ handle::<TimestampMillisecondType, _, TimestampMillisecondType>(
+ args,
+ |s| string_to_timestamp_nanos(s).map(|n| n / 1_000_000),
+ "to_timestamp_millis",
+ )
+}
+
+/// to_timestamp_micros SQL function
+pub fn to_timestamp_micros(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ handle::<TimestampMicrosecondType, _, TimestampMicrosecondType>(
+ args,
+ |s| string_to_timestamp_nanos(s).map(|n| n / 1_000),
+ "to_timestamp_micros",
+ )
+}
+
+/// to_timestamp_seconds SQL function
+pub fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ handle::<TimestampSecondType, _, TimestampSecondType>(
+ args,
+ |s| string_to_timestamp_nanos(s).map(|n| n / 1_000_000_000),
+ "to_timestamp_seconds",
+ )
+}
+
/// Create an implementation of `now()` that always returns the
/// specified timestamp.
///
diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs
index 5c2d9ce..5ed0c74 100644
--- a/datafusion/src/physical_plan/expressions/binary.rs
+++ b/datafusion/src/physical_plan/expressions/binary.rs
@@ -17,6 +17,7 @@
use std::{any::Any, sync::Arc};
+use arrow::array::TimestampMillisecondArray;
use arrow::array::*;
use arrow::compute::kernels::arithmetic::{
add, divide, divide_scalar, multiply, subtract,
@@ -256,6 +257,15 @@ macro_rules! binary_array_op_scalar {
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
+ DataType::Timestamp(TimeUnit::Microsecond, None) => {
+ compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
+ }
+ DataType::Timestamp(TimeUnit::Millisecond, None) => {
+ compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
+ }
+ DataType::Timestamp(TimeUnit::Second, None) => {
+ compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampSecondArray)
+ }
DataType::Date32 => {
compute_op_scalar!($LEFT, $RIGHT, $OP, Date32Array)
}
@@ -288,6 +298,15 @@ macro_rules! binary_array_op {
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
+ DataType::Timestamp(TimeUnit::Microsecond, None) => {
+ compute_op!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
+ }
+ DataType::Timestamp(TimeUnit::Millisecond, None) => {
+ compute_op!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
+ }
+ DataType::Timestamp(TimeUnit::Second, None) => {
+ compute_op!($LEFT, $RIGHT, $OP, TimestampSecondArray)
+ }
DataType::Date32 => {
compute_op!($LEFT, $RIGHT, $OP, Date32Array)
}
diff --git a/datafusion/src/physical_plan/expressions/cast.rs b/datafusion/src/physical_plan/expressions/cast.rs
index ba395f5..558b1e5 100644
--- a/datafusion/src/physical_plan/expressions/cast.rs
+++ b/datafusion/src/physical_plan/expressions/cast.rs
@@ -91,24 +91,26 @@ impl PhysicalExpr for CastExpr {
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let value = self.expr.evaluate(batch)?;
- match value {
- ColumnarValue::Array(array) => {
- Ok(ColumnarValue::Array(kernels::cast::cast_with_options(
- &array,
- &self.cast_type,
- &self.cast_options,
- )?))
- }
- ColumnarValue::Scalar(scalar) => {
- let scalar_array = scalar.to_array();
- let cast_array = kernels::cast::cast_with_options(
- &scalar_array,
- &self.cast_type,
- &self.cast_options,
- )?;
- let cast_scalar = ScalarValue::try_from_array(&cast_array, 0)?;
- Ok(ColumnarValue::Scalar(cast_scalar))
- }
+ cast_column(&value, &self.cast_type, &self.cast_options)
+ }
+}
+
+/// Internal cast function for casting ColumnarValue -> ColumnarValue for cast_type
+pub fn cast_column(
+ value: &ColumnarValue,
+ cast_type: &DataType,
+ cast_options: &CastOptions,
+) -> Result<ColumnarValue> {
+ match value {
+ ColumnarValue::Array(array) => Ok(ColumnarValue::Array(
+ kernels::cast::cast_with_options(array, cast_type, cast_options)?,
+ )),
+ ColumnarValue::Scalar(scalar) => {
+ let scalar_array = scalar.to_array();
+ let cast_array =
+ kernels::cast::cast_with_options(&scalar_array, cast_type, cast_options)?;
+ let cast_scalar = ScalarValue::try_from_array(&cast_array, 0)?;
+ Ok(ColumnarValue::Scalar(cast_scalar))
}
}
}
diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs
index d18365c..f8cb40c 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -49,7 +49,9 @@ mod try_cast;
pub use average::{avg_return_type, Avg, AvgAccumulator};
pub use binary::{binary, binary_operator_data_type, BinaryExpr};
pub use case::{case, CaseExpr};
-pub use cast::{cast, cast_with_options, CastExpr};
+pub use cast::{
+ cast, cast_column, cast_with_options, CastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
+};
pub use column::{col, Column};
pub use count::Count;
pub use in_list::{in_list, InListExpr};
diff --git a/datafusion/src/physical_plan/expressions/nullif.rs b/datafusion/src/physical_plan/expressions/nullif.rs
index 7cc58ed..55e7bda 100644
--- a/datafusion/src/physical_plan/expressions/nullif.rs
+++ b/datafusion/src/physical_plan/expressions/nullif.rs
@@ -21,11 +21,7 @@ use super::ColumnarValue;
use crate::error::{DataFusionError, Result};
use crate::scalar::ScalarValue;
use arrow::array::Array;
-use arrow::array::{
- ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
- Int16Array, Int32Array, Int64Array, Int8Array, StringArray, TimestampNanosecondArray,
- UInt16Array, UInt32Array, UInt64Array, UInt8Array,
-};
+use arrow::array::*;
use arrow::compute::kernels::boolean::nullif;
use arrow::compute::kernels::comparison::{eq, eq_scalar, eq_utf8, eq_utf8_scalar};
use arrow::datatypes::{DataType, TimeUnit};
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 1e423c3..0e2be51 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -36,7 +36,9 @@ use super::{
use crate::execution::context::ExecutionContextState;
use crate::physical_plan::array_expressions;
use crate::physical_plan::datetime_expressions;
-use crate::physical_plan::expressions::{nullif_func, SUPPORTED_NULLIF_TYPES};
+use crate::physical_plan::expressions::{
+ cast_column, nullif_func, DEFAULT_DATAFUSION_CAST_OPTIONS, SUPPORTED_NULLIF_TYPES,
+};
use crate::physical_plan::math_expressions;
use crate::physical_plan::string_expressions;
use crate::{
@@ -205,6 +207,12 @@ pub enum BuiltinScalarFunction {
ToHex,
/// to_timestamp
ToTimestamp,
+ /// to_timestamp_millis
+ ToTimestampMillis,
+ /// to_timestamp_micros
+ ToTimestampMicros,
+ /// to_timestamp_seconds
+ ToTimestampSeconds,
///now
Now,
/// translate
@@ -298,6 +306,9 @@ impl FromStr for BuiltinScalarFunction {
"substr" => BuiltinScalarFunction::Substr,
"to_hex" => BuiltinScalarFunction::ToHex,
"to_timestamp" => BuiltinScalarFunction::ToTimestamp,
+ "to_timestamp_millis" => BuiltinScalarFunction::ToTimestampMillis,
+ "to_timestamp_micros" => BuiltinScalarFunction::ToTimestampMicros,
+ "to_timestamp_seconds" => BuiltinScalarFunction::ToTimestampSeconds,
"now" => BuiltinScalarFunction::Now,
"translate" => BuiltinScalarFunction::Translate,
"trim" => BuiltinScalarFunction::Trim,
@@ -412,6 +423,15 @@ pub fn return_type(
BuiltinScalarFunction::ToTimestamp => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
+ BuiltinScalarFunction::ToTimestampMillis => {
+ Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
+ }
+ BuiltinScalarFunction::ToTimestampMicros => {
+ Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
+ }
+ BuiltinScalarFunction::ToTimestampSeconds => {
+ Ok(DataType::Timestamp(TimeUnit::Second, None))
+ }
BuiltinScalarFunction::Now => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
BuiltinScalarFunction::Translate => utf8_to_str_type(&arg_types[0], "translate"),
BuiltinScalarFunction::Trim => utf8_to_str_type(&arg_types[0], "trim"),
@@ -896,9 +916,6 @@ pub fn create_physical_fun(
other,
))),
}),
- BuiltinScalarFunction::ToTimestamp => {
- Arc::new(datetime_expressions::to_timestamp)
- }
BuiltinScalarFunction::Translate => Arc::new(|args| match args[0].data_type() {
DataType::Utf8 => {
let func = invoke_if_unicode_expressions_feature_flag!(
@@ -934,6 +951,12 @@ pub fn create_physical_fun(
))),
}),
BuiltinScalarFunction::Upper => Arc::new(string_expressions::upper),
+ _ => {
+ return Err(DataFusionError::Internal(format!(
+ "create_physical_fun: Unsupported scalar function {:?}",
+ fun
+ )))
+ }
})
}
@@ -945,7 +968,94 @@ pub fn create_physical_expr(
input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn PhysicalExpr>> {
- let fun_expr = create_physical_fun(fun, ctx_state)?;
+ let fun_expr: ScalarFunctionImplementation = match fun {
+ // These functions need args and input schema to pick an implementation
+ // Unlike the string functions, which actually figure out the function to use with each array,
+ // here we return either a cast fn or string timestamp translation based on the expression data type
+ // so we don't have to pay a per-array/batch cost.
+ BuiltinScalarFunction::ToTimestamp => {
+ Arc::new(match args[0].data_type(input_schema) {
+ Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
+ |col_values: &[ColumnarValue]| {
+ cast_column(
+ &col_values[0],
+ &DataType::Timestamp(TimeUnit::Nanosecond, None),
+ &DEFAULT_DATAFUSION_CAST_OPTIONS,
+ )
+ }
+ }
+ Ok(DataType::Utf8) => datetime_expressions::to_timestamp,
+ other => {
+ return Err(DataFusionError::Internal(format!(
+ "Unsupported data type {:?} for function to_timestamp",
+ other,
+ )))
+ }
+ })
+ }
+ BuiltinScalarFunction::ToTimestampMillis => {
+ Arc::new(match args[0].data_type(input_schema) {
+ Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
+ |col_values: &[ColumnarValue]| {
+ cast_column(
+ &col_values[0],
+ &DataType::Timestamp(TimeUnit::Millisecond, None),
+ &DEFAULT_DATAFUSION_CAST_OPTIONS,
+ )
+ }
+ }
+ Ok(DataType::Utf8) => datetime_expressions::to_timestamp_millis,
+ other => {
+ return Err(DataFusionError::Internal(format!(
+ "Unsupported data type {:?} for function to_timestamp_millis",
+ other,
+ )))
+ }
+ })
+ }
+ BuiltinScalarFunction::ToTimestampMicros => {
+ Arc::new(match args[0].data_type(input_schema) {
+ Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
+ |col_values: &[ColumnarValue]| {
+ cast_column(
+ &col_values[0],
+ &DataType::Timestamp(TimeUnit::Microsecond, None),
+ &DEFAULT_DATAFUSION_CAST_OPTIONS,
+ )
+ }
+ }
+ Ok(DataType::Utf8) => datetime_expressions::to_timestamp_micros,
+ other => {
+ return Err(DataFusionError::Internal(format!(
+ "Unsupported data type {:?} for function to_timestamp_micros",
+ other,
+ )))
+ }
+ })
+ }
+ BuiltinScalarFunction::ToTimestampSeconds => Arc::new({
+ match args[0].data_type(input_schema) {
+ Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
+ |col_values: &[ColumnarValue]| {
+ cast_column(
+ &col_values[0],
+ &DataType::Timestamp(TimeUnit::Second, None),
+ &DEFAULT_DATAFUSION_CAST_OPTIONS,
+ )
+ }
+ }
+ Ok(DataType::Utf8) => datetime_expressions::to_timestamp_seconds,
+ other => {
+ return Err(DataFusionError::Internal(format!(
+ "Unsupported data type {:?} for function to_timestamp_seconds",
+ other,
+ )))
+ }
+ }
+ }),
+ // These don't need args and input schema
+ _ => create_physical_fun(fun, ctx_state)?,
+ };
let args = coerce(args, input_schema, &signature(fun))?;
let arg_types = args
@@ -1026,7 +1136,46 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature {
Signature::Exact(vec![DataType::Utf8, DataType::Int64]),
Signature::Exact(vec![DataType::LargeUtf8, DataType::Int64]),
]),
- BuiltinScalarFunction::ToTimestamp => Signature::Uniform(1, vec![DataType::Utf8]),
+ BuiltinScalarFunction::ToTimestamp => Signature::Uniform(
+ 1,
+ vec![
+ DataType::Utf8,
+ DataType::Int64,
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ DataType::Timestamp(TimeUnit::Second, None),
+ ],
+ ),
+ BuiltinScalarFunction::ToTimestampMillis => Signature::Uniform(
+ 1,
+ vec![
+ DataType::Utf8,
+ DataType::Int64,
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ DataType::Timestamp(TimeUnit::Second, None),
+ ],
+ ),
+ BuiltinScalarFunction::ToTimestampMicros => Signature::Uniform(
+ 1,
+ vec![
+ DataType::Utf8,
+ DataType::Int64,
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ DataType::Timestamp(TimeUnit::Second, None),
+ ],
+ ),
+ BuiltinScalarFunction::ToTimestampSeconds => Signature::Uniform(
+ 1,
+ vec![
+ DataType::Utf8,
+ DataType::Int64,
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ ],
+ ),
BuiltinScalarFunction::DateTrunc => Signature::Exact(vec![
DataType::Utf8,
DataType::Timestamp(TimeUnit::Nanosecond, None),
diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs
index c7afbf5..c23674b 100644
--- a/datafusion/src/scalar.rs
+++ b/datafusion/src/scalar.rs
@@ -900,7 +900,10 @@ impl TryFrom<ScalarValue> for i64 {
fn try_from(value: ScalarValue) -> Result<Self> {
match value {
ScalarValue::Int64(Some(inner_value))
- | ScalarValue::TimestampNanosecond(Some(inner_value)) => Ok(inner_value),
+ | ScalarValue::TimestampNanosecond(Some(inner_value))
+ | ScalarValue::TimestampMicrosecond(Some(inner_value))
+ | ScalarValue::TimestampMillisecond(Some(inner_value))
+ | ScalarValue::TimestampSecond(Some(inner_value)) => Ok(inner_value),
_ => Err(DataFusionError::Internal(format!(
"Cannot convert {:?} to {}",
value,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 21da793..b6393e9 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -27,7 +27,11 @@ extern crate datafusion;
use arrow::{array::*, datatypes::TimeUnit};
use arrow::{datatypes::Int32Type, datatypes::Int64Type, record_batch::RecordBatch};
use arrow::{
- datatypes::{DataType, Field, Schema, SchemaRef},
+ datatypes::{
+ ArrowNativeType, ArrowPrimitiveType, ArrowTimestampType, DataType, Field, Schema,
+ SchemaRef, TimestampMicrosecondType, TimestampMillisecondType,
+ TimestampNanosecondType, TimestampSecondType,
+ },
util::display::array_value_to_string,
};
@@ -1024,6 +1028,188 @@ async fn csv_query_cast_literal() -> Result<()> {
}
#[tokio::test]
+async fn query_cast_timestamp_millis() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+
+ let t1_schema = Arc::new(Schema::new(vec![Field::new("ts", DataType::Int64, true)]));
+ let t1_data = RecordBatch::try_new(
+ t1_schema.clone(),
+ vec![Arc::new(Int64Array::from(vec![
+ 1235865600000,
+ 1235865660000,
+ 1238544000000,
+ ]))],
+ )?;
+ let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?;
+ ctx.register_table("t1", Arc::new(t1_table))?;
+
+ let sql = "SELECT to_timestamp_millis(ts) FROM t1 LIMIT 3";
+ let actual = execute(&mut ctx, sql).await;
+ let expected = vec![
+ vec!["2009-03-01 00:00:00"],
+ vec!["2009-03-01 00:01:00"],
+ vec!["2009-04-01 00:00:00"],
+ ];
+ assert_eq!(expected, actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn query_cast_timestamp_micros() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+
+ let t1_schema = Arc::new(Schema::new(vec![Field::new("ts", DataType::Int64, true)]));
+ let t1_data = RecordBatch::try_new(
+ t1_schema.clone(),
+ vec![Arc::new(Int64Array::from(vec![
+ 1235865600000000,
+ 1235865660000000,
+ 1238544000000000,
+ ]))],
+ )?;
+ let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?;
+ ctx.register_table("t1", Arc::new(t1_table))?;
+
+ let sql = "SELECT to_timestamp_micros(ts) FROM t1 LIMIT 3";
+ let actual = execute(&mut ctx, sql).await;
+ let expected = vec![
+ vec!["2009-03-01 00:00:00"],
+ vec!["2009-03-01 00:01:00"],
+ vec!["2009-04-01 00:00:00"],
+ ];
+ assert_eq!(expected, actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn query_cast_timestamp_seconds() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+
+ let t1_schema = Arc::new(Schema::new(vec![Field::new("ts", DataType::Int64, true)]));
+ let t1_data = RecordBatch::try_new(
+ t1_schema.clone(),
+ vec![Arc::new(Int64Array::from(vec![
+ 1235865600, 1235865660, 1238544000,
+ ]))],
+ )?;
+ let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?;
+ ctx.register_table("t1", Arc::new(t1_table))?;
+
+ let sql = "SELECT to_timestamp_seconds(ts) FROM t1 LIMIT 3";
+ let actual = execute(&mut ctx, sql).await;
+ let expected = vec![
+ vec!["2009-03-01 00:00:00"],
+ vec!["2009-03-01 00:01:00"],
+ vec!["2009-04-01 00:00:00"],
+ ];
+ assert_eq!(expected, actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn query_cast_timestamp_nanos_to_others() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table("ts_data", make_timestamp_nano_table()?)?;
+
+ // Original column is nanos, convert to millis and check timestamp
+ let sql = "SELECT to_timestamp_millis(ts) FROM ts_data LIMIT 3";
+ let actual = execute(&mut ctx, sql).await;
+ let expected = vec![
+ vec!["2020-09-08 13:42:29.190"],
+ vec!["2020-09-08 12:42:29.190"],
+ vec!["2020-09-08 11:42:29.190"],
+ ];
+ assert_eq!(expected, actual);
+
+ let sql = "SELECT to_timestamp_micros(ts) FROM ts_data LIMIT 3";
+ let actual = execute(&mut ctx, sql).await;
+ let expected = vec![
+ vec!["2020-09-08 13:42:29.190855"],
+ vec!["2020-09-08 12:42:29.190855"],
+ vec!["2020-09-08 11:42:29.190855"],
+ ];
+ assert_eq!(expected, actual);
+
+ let sql = "SELECT to_timestamp_seconds(ts) FROM ts_data LIMIT 3";
+ let actual = execute(&mut ctx, sql).await;
+ let expected = vec![
+ vec!["2020-09-08 13:42:29"],
+ vec!["2020-09-08 12:42:29"],
+ vec!["2020-09-08 11:42:29"],
+ ];
+ assert_eq!(expected, actual);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn query_cast_timestamp_seconds_to_others() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table("ts_secs", make_timestamp_table::<TimestampSecondType>()?)?;
+
+ // Original column is seconds, convert to millis and check timestamp
+ let sql = "SELECT to_timestamp_millis(ts) FROM ts_secs LIMIT 3";
+ let actual = execute(&mut ctx, sql).await;
+ let expected = vec![
+ vec!["2020-09-08 13:42:29"],
+ vec!["2020-09-08 12:42:29"],
+ vec!["2020-09-08 11:42:29"],
+ ];
+ assert_eq!(expected, actual);
+
+ // Original column is seconds, convert to micros and check timestamp
+ let sql = "SELECT to_timestamp_micros(ts) FROM ts_secs LIMIT 3";
+ let actual = execute(&mut ctx, sql).await;
+ assert_eq!(expected, actual);
+
+ // to nanos
+ let sql = "SELECT to_timestamp(ts) FROM ts_secs LIMIT 3";
+ let actual = execute(&mut ctx, sql).await;
+ assert_eq!(expected, actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn query_cast_timestamp_micros_to_others() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table(
+ "ts_micros",
+ make_timestamp_table::<TimestampMicrosecondType>()?,
+ )?;
+
+ // Original column is micros, convert to millis and check timestamp
+ let sql = "SELECT to_timestamp_millis(ts) FROM ts_micros LIMIT 3";
+ let actual = execute(&mut ctx, sql).await;
+ let expected = vec![
+ vec!["2020-09-08 13:42:29.190"],
+ vec!["2020-09-08 12:42:29.190"],
+ vec!["2020-09-08 11:42:29.190"],
+ ];
+ assert_eq!(expected, actual);
+
+ // Original column is micros, convert to seconds and check timestamp
+ let sql = "SELECT to_timestamp_seconds(ts) FROM ts_micros LIMIT 3";
+ let actual = execute(&mut ctx, sql).await;
+ let expected = vec![
+ vec!["2020-09-08 13:42:29"],
+ vec!["2020-09-08 12:42:29"],
+ vec!["2020-09-08 11:42:29"],
+ ];
+ assert_eq!(expected, actual);
+
+ // Original column is micros, convert to nanos and check timestamp
+ let sql = "SELECT to_timestamp(ts) FROM ts_micros LIMIT 3";
+ let actual = execute(&mut ctx, sql).await;
+ let expected = vec![
+ vec!["2020-09-08 13:42:29.190855"],
+ vec!["2020-09-08 12:42:29.190855"],
+ vec!["2020-09-08 11:42:29.190855"],
+ ];
+ assert_eq!(expected, actual);
+ Ok(())
+}
+
+#[tokio::test]
async fn union_all() -> Result<()> {
let mut ctx = ExecutionContext::new();
let sql = "SELECT 1 as x UNION ALL SELECT 2 as x";
@@ -2439,17 +2625,33 @@ async fn like() -> Result<()> {
Ok(())
}
-fn make_timestamp_nano_table() -> Result<Arc<MemTable>> {
+fn make_timestamp_table<A>() -> Result<Arc<MemTable>>
+where
+ A: ArrowTimestampType,
+{
let schema = Arc::new(Schema::new(vec![
- Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
+ Field::new("ts", DataType::Timestamp(A::get_time_unit(), None), false),
Field::new("value", DataType::Int32, true),
]));
- let mut builder = TimestampNanosecondArray::builder(3);
-
- builder.append_value(1599572549190855000)?; // 2020-09-08T13:42:29.190855+00:00
- builder.append_value(1599568949190855000)?; // 2020-09-08T12:42:29.190855+00:00
- builder.append_value(1599565349190855000)?; // 2020-09-08T11:42:29.190855+00:00
+ let mut builder = PrimitiveBuilder::<A>::new(3);
+
+ let nanotimestamps = vec![
+ 1599572549190855000i64, // 2020-09-08T13:42:29.190855+00:00
+ 1599568949190855000, // 2020-09-08T12:42:29.190855+00:00
+ 1599565349190855000, //2020-09-08T11:42:29.190855+00:00
+ ]; // 2020-09-08T11:42:29.190855+00:00
+ let divisor = match A::get_time_unit() {
+ TimeUnit::Nanosecond => 1,
+ TimeUnit::Microsecond => 1000,
+ TimeUnit::Millisecond => 1_000_000,
+ TimeUnit::Second => 1_000_000_000,
+ };
+ for ts in nanotimestamps {
+ builder.append_value(
+ <A as ArrowPrimitiveType>::Native::from_i64(ts / divisor).unwrap(),
+ )?;
+ }
let data = RecordBatch::try_new(
schema.clone(),
@@ -2462,6 +2664,10 @@ fn make_timestamp_nano_table() -> Result<Arc<MemTable>> {
Ok(Arc::new(table))
}
+fn make_timestamp_nano_table() -> Result<Arc<MemTable>> {
+ make_timestamp_table::<TimestampNanosecondType>()
+}
+
#[tokio::test]
async fn to_timestamp() -> Result<()> {
let mut ctx = ExecutionContext::new();
@@ -2476,6 +2682,51 @@ async fn to_timestamp() -> Result<()> {
}
#[tokio::test]
+async fn to_timestamp_millis() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table(
+ "ts_data",
+ make_timestamp_table::<TimestampMillisecondType>()?,
+ )?;
+
+ let sql = "SELECT COUNT(*) FROM ts_data where ts > to_timestamp_millis('2020-09-08T12:00:00+00:00')";
+ let actual = execute(&mut ctx, sql).await;
+
+ let expected = vec![vec!["2"]];
+ assert_eq!(expected, actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn to_timestamp_micros() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table(
+ "ts_data",
+ make_timestamp_table::<TimestampMicrosecondType>()?,
+ )?;
+
+ let sql = "SELECT COUNT(*) FROM ts_data where ts > to_timestamp_micros('2020-09-08T12:00:00+00:00')";
+ let actual = execute(&mut ctx, sql).await;
+
+ let expected = vec![vec!["2"]];
+ assert_eq!(expected, actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn to_timestamp_seconds() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table("ts_data", make_timestamp_table::<TimestampSecondType>()?)?;
+
+ let sql = "SELECT COUNT(*) FROM ts_data where ts > to_timestamp_seconds('2020-09-08T12:00:00+00:00')";
+ let actual = execute(&mut ctx, sql).await;
+
+ let expected = vec![vec!["2"]];
+ assert_eq!(expected, actual);
+ Ok(())
+}
+
+#[tokio::test]
async fn count_distinct_timestamps() -> Result<()> {
let mut ctx = ExecutionContext::new();
ctx.register_table("ts_data", make_timestamp_nano_table()?)?;
diff --git a/docs/user-guide/src/SUMMARY.md b/docs/user-guide/src/SUMMARY.md
index aa101b3..516fcce 100644
--- a/docs/user-guide/src/SUMMARY.md
+++ b/docs/user-guide/src/SUMMARY.md
@@ -27,6 +27,7 @@
- [SELECT](sql/select.md)
- [DDL](sql/ddl.md)
- [CREATE EXTERNAL TABLE](sql/ddl.md)
+ - [Datafusion Specific Functions](sql/datafusion-functions.md)
- [Distributed](distributed/introduction.md)
- [Create a Ballista Cluster](distributed/deployment.md)
diff --git a/docs/user-guide/src/sql/datafusion-functions.md b/docs/user-guide/src/sql/datafusion-functions.md
new file mode 100644
index 0000000..8431baf
--- /dev/null
+++ b/docs/user-guide/src/sql/datafusion-functions.md
@@ -0,0 +1,86 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Datafusion-Specific Functions
+
+These SQL functions are specific to DataFusion, or they are well known and have functionality which is specific to DataFusion. Specifically, the `to_timestamp_xx()` functions exist due to Arrow's support for multiple timestamp resolutions.
+
+## `to_timestamp`
+
+`to_timestamp()` is similar to the standard SQL function. It performs conversions to type `Timestamp(Nanoseconds, None)`, from:
+
+- Timestamp strings
+ - `1997-01-31T09:26:56.123Z` # RCF3339
+ - `1997-01-31T09:26:56.123-05:00` # RCF3339
+ - `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space er than T
+ - `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone et specified
+ - `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and timezone offset
+ - `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds
+- An Int64 array/column, values are nanoseconds since Epoch UTC
+- Other Timestamp() columns or values
+
+Note that conversions from other Timestamp and Int64 types can also be performed using `CAST(.. AS Timestamp)`. However, the conversion functionality here is present for consistency with the other `to_timestamp_xx()` functions.
+
+## `to_timestamp_millis`
+
+`to_timestamp_millis()` does conversions to type `Timestamp(Milliseconds, None)`, from:
+
+- Timestamp strings, the same as supported by the regular timestamp() function (except the output is a timestamp of Milliseconds resolution)
+ - `1997-01-31T09:26:56.123Z` # RCF3339
+ - `1997-01-31T09:26:56.123-05:00` # RCF3339
+ - `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space er than T
+ - `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone et specified
+ - `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and timezone offset
+ - `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds
+- An Int64 array/column, values are milliseconds since Epoch UTC
+- Other Timestamp() columns or values
+
+Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolution; this function is the only way to convert/cast to millisecond resolution.
+
+## `to_timestamp_micros`
+
+`to_timestamp_micros()` does conversions to type `Timestamp(Microseconds, None)`, from:
+
+- Timestamp strings, the same as supported by the regular timestamp() function (except the output is a timestamp of microseconds resolution)
+ - `1997-01-31T09:26:56.123Z` # RCF3339
+ - `1997-01-31T09:26:56.123-05:00` # RCF3339
+ - `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space er than T
+ - `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone et specified
+ - `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and timezone offset
+ - `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds
+- An Int64 array/column, values are microseconds since Epoch UTC
+- Other Timestamp() columns or values
+
+Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolution; this function is the only way to convert/cast to microsecond resolution.
+
+## `to_timestamp_seconds`
+
+`to_timestamp_seconds()` does conversions to type `Timestamp(Seconds, None)`, from:
+
+- Timestamp strings, the same as supported by the regular timestamp() function (except the output is a timestamp of secondseconds resolution)
+ - `1997-01-31T09:26:56.123Z` # RCF3339
+ - `1997-01-31T09:26:56.123-05:00` # RCF3339
+ - `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space er than T
+ - `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone et specified
+ - `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and timezone offset
+ - `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds
+- An Int64 array/column, values are seconds since Epoch UTC
+- Other Timestamp() columns or values
+
+Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolution; this function is the only way to convert/cast to seconds resolution.