You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/07/18 19:55:14 UTC
[arrow-datafusion] branch master updated: Add `from_unixtime` function (#2924)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b772c6df9 Add `from_unixtime` function (#2924)
b772c6df9 is described below
commit b772c6df99fa23bbb75b65d06afb2e4dd3bba697
Author: Wei-Ting Kuo <wa...@gmail.com>
AuthorDate: Tue Jul 19 03:55:08 2022 +0800
Add `from_unixtime` function (#2924)
* add from_unixtime
* format from_unixtime related functions by rustfmt
* delete debug information
---
datafusion/core/src/logical_plan/mod.rs | 4 +--
datafusion/core/src/prelude.rs | 10 +++----
datafusion/core/tests/sql/timestamp.rs | 50 +++++++++++++++++++++++++++++++
datafusion/expr/src/built_in_function.rs | 4 +++
datafusion/expr/src/expr_fn.rs | 2 ++
datafusion/expr/src/function.rs | 6 ++++
datafusion/physical-expr/src/functions.rs | 17 +++++++++++
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/from_proto.rs | 6 +++-
datafusion/proto/src/to_proto.rs | 1 +
10 files changed, 93 insertions(+), 8 deletions(-)
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index f568c88ee..e4e26ad54 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -38,8 +38,8 @@ pub use datafusion_expr::{
ExprRewriter, RewriteRecursion,
},
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
- floor, in_list, in_subquery, initcap, left, length, lit, lit_timestamp_nano, ln,
- log10, log2,
+ floor, from_unixtime, in_list, in_subquery, initcap, left, length, lit,
+ lit_timestamp_nano, ln, log10, log2,
logical_plan::{
builder::{
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs
index c5f2b4fbf..eae96ce48 100644
--- a/datafusion/core/src/prelude.rs
+++ b/datafusion/core/src/prelude.rs
@@ -33,9 +33,9 @@ pub use crate::execution::options::{
pub use crate::logical_plan::{
approx_percentile_cont, array, ascii, avg, bit_length, btrim, character_length, chr,
coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest,
- exists, in_list, in_subquery, initcap, left, length, lit, lower, lpad, ltrim, max,
- md5, min, not_exists, not_in_subquery, now, octet_length, random, regexp_match,
- regexp_replace, repeat, replace, reverse, right, rpad, rtrim, scalar_subquery,
- sha224, sha256, sha384, sha512, split_part, starts_with, strpos, substr, sum, to_hex,
- translate, trim, upper, Column, Expr, JoinType, Partitioning,
+ exists, from_unixtime, in_list, in_subquery, initcap, left, length, lit, lower, lpad,
+ ltrim, max, md5, min, not_exists, not_in_subquery, now, octet_length, random,
+ regexp_match, regexp_replace, repeat, replace, reverse, right, rpad, rtrim,
+ scalar_subquery, sha224, sha256, sha384, sha512, split_part, starts_with, strpos,
+ substr, sum, to_hex, translate, trim, upper, Column, Expr, JoinType, Partitioning,
};
diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs
index 9acc3f3cb..8452fcf81 100644
--- a/datafusion/core/tests/sql/timestamp.rs
+++ b/datafusion/core/tests/sql/timestamp.rs
@@ -266,6 +266,37 @@ async fn query_cast_timestamp_micros_to_others() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn query_cast_timestamp_from_unixtime() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let t1_schema = Arc::new(Schema::new(vec![Field::new("ts", DataType::Int64, true)]));
+ let t1_data = RecordBatch::try_new(
+ t1_schema.clone(),
+ vec![Arc::new(Int64Array::from(vec![
+ 1235865600, 1235865660, 1238544000,
+ ]))],
+ )?;
+ let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?;
+ ctx.register_table("t1", Arc::new(t1_table))?;
+
+ let sql = "SELECT from_unixtime(ts) FROM t1 LIMIT 3";
+ let actual = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec![
+ "+---------------------+",
+ "| fromunixtime(t1.ts) |",
+ "+---------------------+",
+ "| 2009-03-01 00:00:00 |",
+ "| 2009-03-01 00:01:00 |",
+ "| 2009-04-01 00:00:00 |",
+ "+---------------------+",
+ ];
+
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
#[tokio::test]
async fn to_timestamp() -> Result<()> {
let ctx = SessionContext::new();
@@ -347,6 +378,25 @@ async fn to_timestamp_seconds() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn from_unixtime() -> Result<()> {
+ let ctx = SessionContext::new();
+ ctx.register_table("ts_data", make_timestamp_table::<TimestampSecondType>()?)?;
+
+ let sql = "SELECT COUNT(*) FROM ts_data where ts > from_unixtime(1599566400)"; // '2020-09-08T12:00:00+00:00'
+ let actual = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec![
+ "+-----------------+",
+ "| COUNT(UInt8(1)) |",
+ "+-----------------+",
+ "| 2 |",
+ "+-----------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
#[tokio::test]
async fn count_distinct_timestamps() -> Result<()> {
let ctx = SessionContext::new();
diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs
index 323963c56..663888e2e 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -148,6 +148,8 @@ pub enum BuiltinScalarFunction {
ToTimestampMicros,
/// to_timestamp_seconds
ToTimestampSeconds,
+ /// from_unixtime
+ FromUnixtime,
///now
Now,
/// translate
@@ -239,6 +241,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Upper => Volatility::Immutable,
BuiltinScalarFunction::RegexpMatch => Volatility::Immutable,
BuiltinScalarFunction::Struct => Volatility::Immutable,
+ BuiltinScalarFunction::FromUnixtime => Volatility::Immutable,
// Stable builtin functions
BuiltinScalarFunction::Now => Volatility::Stable,
@@ -333,6 +336,7 @@ impl FromStr for BuiltinScalarFunction {
"upper" => BuiltinScalarFunction::Upper,
"regexp_match" => BuiltinScalarFunction::RegexpMatch,
"struct" => BuiltinScalarFunction::Struct,
+ "from_unixtime" => BuiltinScalarFunction::FromUnixtime,
_ => {
return Err(DataFusionError::Plan(format!(
"There is no built-in function named {}",
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 76bd9a975..abfd37a7c 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -352,6 +352,7 @@ scalar_expr!(DateTrunc, date_trunc, part, date);
scalar_expr!(ToTimestampMillis, to_timestamp_millis, date);
scalar_expr!(ToTimestampMicros, to_timestamp_micros, date);
scalar_expr!(ToTimestampSeconds, to_timestamp_seconds, date);
+scalar_expr!(FromUnixtime, from_unixtime, unixtime);
/// Returns an array of fixed size with each argument on it.
pub fn array(args: Vec<Expr>) -> Expr {
@@ -601,6 +602,7 @@ mod test {
test_scalar_expr!(DatePart, date_part, part, date);
test_scalar_expr!(DateTrunc, date_trunc, part, date);
+ test_scalar_expr!(FromUnixtime, from_unixtime, unixtime);
}
#[test]
diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs
index fc60e9c86..331756f8d 100644
--- a/datafusion/expr/src/function.rs
+++ b/datafusion/expr/src/function.rs
@@ -194,6 +194,9 @@ pub fn return_type(
BuiltinScalarFunction::ToTimestampSeconds => {
Ok(DataType::Timestamp(TimeUnit::Second, None))
}
+ BuiltinScalarFunction::FromUnixtime => {
+ Ok(DataType::Timestamp(TimeUnit::Second, None))
+ }
BuiltinScalarFunction::Now => Ok(DataType::Timestamp(
TimeUnit::Nanosecond,
Some("UTC".to_owned()),
@@ -381,6 +384,9 @@ pub fn signature(fun: &BuiltinScalarFunction) -> Signature {
],
fun.volatility(),
),
+ BuiltinScalarFunction::FromUnixtime => {
+ Signature::uniform(1, vec![DataType::Int64], fun.volatility())
+ }
BuiltinScalarFunction::Digest => {
Signature::exact(vec![DataType::Utf8, DataType::Utf8], fun.volatility())
}
diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs
index 75604a532..5f0e711f8 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -152,6 +152,23 @@ pub fn create_physical_expr(
}
}
}),
+ BuiltinScalarFunction::FromUnixtime => Arc::new({
+ match coerced_phy_exprs[0].data_type(input_schema) {
+ Ok(DataType::Int64) => |col_values: &[ColumnarValue]| {
+ cast_column(
+ &col_values[0],
+ &DataType::Timestamp(TimeUnit::Second, None),
+ &DEFAULT_DATAFUSION_CAST_OPTIONS,
+ )
+ },
+ other => {
+ return Err(DataFusionError::Internal(format!(
+ "Unsupported data type {:?} for function from_unixtime",
+ other,
+ )))
+ }
+ }
+ }),
// These don't need args and input schema
_ => create_physical_fun(fun, execution_props)?,
};
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index d7165c5d3..39c254ea7 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -438,6 +438,7 @@ enum ScalarFunction {
Coalesce=63;
Power=64;
StructFun=65;
+ FromUnixtime=66;
}
message ScalarFunctionNode {
diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs
index 1cd6edfe8..cb7b11189 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -34,7 +34,7 @@ use datafusion_expr::expr::GroupingSet::GroupingSets;
use datafusion_expr::{
abs, acos, array, ascii, asin, atan, bit_length, btrim, ceil, character_length, chr,
coalesce, concat_expr, concat_ws_expr, cos, date_part, date_trunc, digest, exp,
- floor, left, ln, log10, log2,
+ floor, from_unixtime, left, ln, log10, log2,
logical_plan::{PlanType, StringifiedPlan},
lower, lpad, ltrim, md5, now_expr, nullif, octet_length, power, random, regexp_match,
regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256,
@@ -473,6 +473,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::Coalesce => Self::Coalesce,
ScalarFunction::Power => Self::Power,
ScalarFunction::StructFun => Self::Struct,
+ ScalarFunction::FromUnixtime => Self::FromUnixtime,
}
}
}
@@ -1128,6 +1129,9 @@ pub fn parse_expr(
parse_expr(&args[0], registry)?,
parse_expr(&args[1], registry)?,
)),
+ ScalarFunction::FromUnixtime => {
+ Ok(from_unixtime(parse_expr(&args[0], registry)?))
+ }
_ => Err(proto_error(
"Protobuf deserialization error: Unsupported scalar function",
)),
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index d5ba5c38a..fd5276ca8 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -1120,6 +1120,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::Coalesce => Self::Coalesce,
BuiltinScalarFunction::Power => Self::Power,
BuiltinScalarFunction::Struct => Self::StructFun,
+ BuiltinScalarFunction::FromUnixtime => Self::FromUnixtime,
};
Ok(scalar_function)