You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/07/18 19:55:14 UTC

[arrow-datafusion] branch master updated: Add `from_unixtime` function (#2924)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b772c6df9 Add `from_unixtime` function (#2924)
b772c6df9 is described below

commit b772c6df99fa23bbb75b65d06afb2e4dd3bba697
Author: Wei-Ting Kuo <wa...@gmail.com>
AuthorDate: Tue Jul 19 03:55:08 2022 +0800

    Add `from_unixtime` function (#2924)
    
    * add from_unixtime
    
    * format from_unixtime related functions by rustfmt
    
    * delete debug information
---
 datafusion/core/src/logical_plan/mod.rs   |  4 +--
 datafusion/core/src/prelude.rs            | 10 +++----
 datafusion/core/tests/sql/timestamp.rs    | 50 +++++++++++++++++++++++++++++++
 datafusion/expr/src/built_in_function.rs  |  4 +++
 datafusion/expr/src/expr_fn.rs            |  2 ++
 datafusion/expr/src/function.rs           |  6 ++++
 datafusion/physical-expr/src/functions.rs | 17 +++++++++++
 datafusion/proto/proto/datafusion.proto   |  1 +
 datafusion/proto/src/from_proto.rs        |  6 +++-
 datafusion/proto/src/to_proto.rs          |  1 +
 10 files changed, 93 insertions(+), 8 deletions(-)

diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index f568c88ee..e4e26ad54 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -38,8 +38,8 @@ pub use datafusion_expr::{
         ExprRewriter, RewriteRecursion,
     },
     expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
-    floor, in_list, in_subquery, initcap, left, length, lit, lit_timestamp_nano, ln,
-    log10, log2,
+    floor, from_unixtime, in_list, in_subquery, initcap, left, length, lit,
+    lit_timestamp_nano, ln, log10, log2,
     logical_plan::{
         builder::{
             build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs
index c5f2b4fbf..eae96ce48 100644
--- a/datafusion/core/src/prelude.rs
+++ b/datafusion/core/src/prelude.rs
@@ -33,9 +33,9 @@ pub use crate::execution::options::{
 pub use crate::logical_plan::{
     approx_percentile_cont, array, ascii, avg, bit_length, btrim, character_length, chr,
     coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest,
-    exists, in_list, in_subquery, initcap, left, length, lit, lower, lpad, ltrim, max,
-    md5, min, not_exists, not_in_subquery, now, octet_length, random, regexp_match,
-    regexp_replace, repeat, replace, reverse, right, rpad, rtrim, scalar_subquery,
-    sha224, sha256, sha384, sha512, split_part, starts_with, strpos, substr, sum, to_hex,
-    translate, trim, upper, Column, Expr, JoinType, Partitioning,
+    exists, from_unixtime, in_list, in_subquery, initcap, left, length, lit, lower, lpad,
+    ltrim, max, md5, min, not_exists, not_in_subquery, now, octet_length, random,
+    regexp_match, regexp_replace, repeat, replace, reverse, right, rpad, rtrim,
+    scalar_subquery, sha224, sha256, sha384, sha512, split_part, starts_with, strpos,
+    substr, sum, to_hex, translate, trim, upper, Column, Expr, JoinType, Partitioning,
 };
diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs
index 9acc3f3cb..8452fcf81 100644
--- a/datafusion/core/tests/sql/timestamp.rs
+++ b/datafusion/core/tests/sql/timestamp.rs
@@ -266,6 +266,37 @@ async fn query_cast_timestamp_micros_to_others() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn query_cast_timestamp_from_unixtime() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let t1_schema = Arc::new(Schema::new(vec![Field::new("ts", DataType::Int64, true)]));
+    let t1_data = RecordBatch::try_new(
+        t1_schema.clone(),
+        vec![Arc::new(Int64Array::from(vec![
+            1235865600, 1235865660, 1238544000,
+        ]))],
+    )?;
+    let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?;
+    ctx.register_table("t1", Arc::new(t1_table))?;
+
+    let sql = "SELECT from_unixtime(ts) FROM t1 LIMIT 3";
+    let actual = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec![
+        "+---------------------+",
+        "| fromunixtime(t1.ts) |",
+        "+---------------------+",
+        "| 2009-03-01 00:00:00 |",
+        "| 2009-03-01 00:01:00 |",
+        "| 2009-04-01 00:00:00 |",
+        "+---------------------+",
+    ];
+
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
 #[tokio::test]
 async fn to_timestamp() -> Result<()> {
     let ctx = SessionContext::new();
@@ -347,6 +378,25 @@ async fn to_timestamp_seconds() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn from_unixtime() -> Result<()> {
+    let ctx = SessionContext::new();
+    ctx.register_table("ts_data", make_timestamp_table::<TimestampSecondType>()?)?;
+
+    let sql = "SELECT COUNT(*) FROM ts_data where ts > from_unixtime(1599566400)"; // '2020-09-08T12:00:00+00:00'
+    let actual = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec![
+        "+-----------------+",
+        "| COUNT(UInt8(1)) |",
+        "+-----------------+",
+        "| 2               |",
+        "+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
 #[tokio::test]
 async fn count_distinct_timestamps() -> Result<()> {
     let ctx = SessionContext::new();
diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs
index 323963c56..663888e2e 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -148,6 +148,8 @@ pub enum BuiltinScalarFunction {
     ToTimestampMicros,
     /// to_timestamp_seconds
     ToTimestampSeconds,
+    /// from_unixtime
+    FromUnixtime,
     ///now
     Now,
     /// translate
@@ -239,6 +241,7 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::Upper => Volatility::Immutable,
             BuiltinScalarFunction::RegexpMatch => Volatility::Immutable,
             BuiltinScalarFunction::Struct => Volatility::Immutable,
+            BuiltinScalarFunction::FromUnixtime => Volatility::Immutable,
 
             // Stable builtin functions
             BuiltinScalarFunction::Now => Volatility::Stable,
@@ -333,6 +336,7 @@ impl FromStr for BuiltinScalarFunction {
             "upper" => BuiltinScalarFunction::Upper,
             "regexp_match" => BuiltinScalarFunction::RegexpMatch,
             "struct" => BuiltinScalarFunction::Struct,
+            "from_unixtime" => BuiltinScalarFunction::FromUnixtime,
             _ => {
                 return Err(DataFusionError::Plan(format!(
                     "There is no built-in function named {}",
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 76bd9a975..abfd37a7c 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -352,6 +352,7 @@ scalar_expr!(DateTrunc, date_trunc, part, date);
 scalar_expr!(ToTimestampMillis, to_timestamp_millis, date);
 scalar_expr!(ToTimestampMicros, to_timestamp_micros, date);
 scalar_expr!(ToTimestampSeconds, to_timestamp_seconds, date);
+scalar_expr!(FromUnixtime, from_unixtime, unixtime);
 
 /// Returns an array of fixed size with each argument on it.
 pub fn array(args: Vec<Expr>) -> Expr {
@@ -601,6 +602,7 @@ mod test {
 
         test_scalar_expr!(DatePart, date_part, part, date);
         test_scalar_expr!(DateTrunc, date_trunc, part, date);
+        test_scalar_expr!(FromUnixtime, from_unixtime, unixtime);
     }
 
     #[test]
diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs
index fc60e9c86..331756f8d 100644
--- a/datafusion/expr/src/function.rs
+++ b/datafusion/expr/src/function.rs
@@ -194,6 +194,9 @@ pub fn return_type(
         BuiltinScalarFunction::ToTimestampSeconds => {
             Ok(DataType::Timestamp(TimeUnit::Second, None))
         }
+        BuiltinScalarFunction::FromUnixtime => {
+            Ok(DataType::Timestamp(TimeUnit::Second, None))
+        }
         BuiltinScalarFunction::Now => Ok(DataType::Timestamp(
             TimeUnit::Nanosecond,
             Some("UTC".to_owned()),
@@ -381,6 +384,9 @@ pub fn signature(fun: &BuiltinScalarFunction) -> Signature {
             ],
             fun.volatility(),
         ),
+        BuiltinScalarFunction::FromUnixtime => {
+            Signature::uniform(1, vec![DataType::Int64], fun.volatility())
+        }
         BuiltinScalarFunction::Digest => {
             Signature::exact(vec![DataType::Utf8, DataType::Utf8], fun.volatility())
         }
diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs
index 75604a532..5f0e711f8 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -152,6 +152,23 @@ pub fn create_physical_expr(
                 }
             }
         }),
+        BuiltinScalarFunction::FromUnixtime => Arc::new({
+            match coerced_phy_exprs[0].data_type(input_schema) {
+                Ok(DataType::Int64) => |col_values: &[ColumnarValue]| {
+                    cast_column(
+                        &col_values[0],
+                        &DataType::Timestamp(TimeUnit::Second, None),
+                        &DEFAULT_DATAFUSION_CAST_OPTIONS,
+                    )
+                },
+                other => {
+                    return Err(DataFusionError::Internal(format!(
+                        "Unsupported data type {:?} for function from_unixtime",
+                        other,
+                    )))
+                }
+            }
+        }),
         // These don't need args and input schema
         _ => create_physical_fun(fun, execution_props)?,
     };
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index d7165c5d3..39c254ea7 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -438,6 +438,7 @@ enum ScalarFunction {
   Coalesce=63;
   Power=64;
   StructFun=65;
+  FromUnixtime=66;
 }
 
 message ScalarFunctionNode {
diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs
index 1cd6edfe8..cb7b11189 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -34,7 +34,7 @@ use datafusion_expr::expr::GroupingSet::GroupingSets;
 use datafusion_expr::{
     abs, acos, array, ascii, asin, atan, bit_length, btrim, ceil, character_length, chr,
     coalesce, concat_expr, concat_ws_expr, cos, date_part, date_trunc, digest, exp,
-    floor, left, ln, log10, log2,
+    floor, from_unixtime, left, ln, log10, log2,
     logical_plan::{PlanType, StringifiedPlan},
     lower, lpad, ltrim, md5, now_expr, nullif, octet_length, power, random, regexp_match,
     regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256,
@@ -473,6 +473,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
             ScalarFunction::Coalesce => Self::Coalesce,
             ScalarFunction::Power => Self::Power,
             ScalarFunction::StructFun => Self::Struct,
+            ScalarFunction::FromUnixtime => Self::FromUnixtime,
         }
     }
 }
@@ -1128,6 +1129,9 @@ pub fn parse_expr(
                     parse_expr(&args[0], registry)?,
                     parse_expr(&args[1], registry)?,
                 )),
+                ScalarFunction::FromUnixtime => {
+                    Ok(from_unixtime(parse_expr(&args[0], registry)?))
+                }
                 _ => Err(proto_error(
                     "Protobuf deserialization error: Unsupported scalar function",
                 )),
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index d5ba5c38a..fd5276ca8 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -1120,6 +1120,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
             BuiltinScalarFunction::Coalesce => Self::Coalesce,
             BuiltinScalarFunction::Power => Self::Power,
             BuiltinScalarFunction::Struct => Self::StructFun,
+            BuiltinScalarFunction::FromUnixtime => Self::FromUnixtime,
         };
 
         Ok(scalar_function)