You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/11/02 07:22:07 UTC

[arrow-datafusion] branch master updated: add uuid() function to return a unique uuid per row (#4041)

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ec15a401 add uuid() function to return a unique uuid per row (#4041)
2ec15a401 is described below

commit 2ec15a4010052fc7754ae2a4d86eb21019ab5184
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Wed Nov 2 15:22:00 2022 +0800

    add uuid() function to return a unique uuid per row (#4041)
---
 datafusion-cli/Cargo.lock                          |  1 +
 datafusion/core/tests/sql/expr.rs                  | 10 ++++++++++
 datafusion/expr/src/built_in_function.rs           |  5 +++++
 datafusion/expr/src/expr_fn.rs                     | 21 ++++++++++++++++++++-
 datafusion/expr/src/function.rs                    |  2 ++
 datafusion/physical-expr/Cargo.toml                |  1 +
 datafusion/physical-expr/src/functions.rs          |  7 ++++++-
 datafusion/physical-expr/src/string_expressions.rs | 19 +++++++++++++++++++
 datafusion/proto/proto/datafusion.proto            |  1 +
 datafusion/proto/src/from_proto.rs                 |  6 ++++--
 datafusion/proto/src/generated/pbjson.rs           |  3 +++
 datafusion/proto/src/generated/prost.rs            |  2 ++
 datafusion/proto/src/to_proto.rs                   |  1 +
 13 files changed, 75 insertions(+), 4 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index db724780e..a693295e9 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -662,6 +662,7 @@ dependencies = [
  "regex",
  "sha2",
  "unicode-segmentation",
+ "uuid",
 ]
 
 [[package]]
diff --git a/datafusion/core/tests/sql/expr.rs b/datafusion/core/tests/sql/expr.rs
index b7e685592..8251a5546 100644
--- a/datafusion/core/tests/sql/expr.rs
+++ b/datafusion/core/tests/sql/expr.rs
@@ -1179,6 +1179,16 @@ async fn test_random_expression() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn test_uuid_expression() -> Result<()> {
+    let ctx = create_ctx();
+    let sql = "SELECT uuid()";
+    let actual = execute(&ctx, sql).await;
+    let uuid = actual[0][0].parse::<uuid::Uuid>().unwrap();
+    assert_eq!(uuid.get_version_num(), 4);
+    Ok(())
+}
+
 #[tokio::test]
 async fn case_with_bool_type_result() -> Result<()> {
     let ctx = SessionContext::new();
diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs
index c8e144718..d78d38262 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -166,6 +166,8 @@ pub enum BuiltinScalarFunction {
     Trim,
     /// upper
     Upper,
+    /// uuid
+    Uuid,
     /// regexp_match
     RegexpMatch,
     /// struct
@@ -184,6 +186,7 @@ impl BuiltinScalarFunction {
                 | BuiltinScalarFunction::Now
                 | BuiltinScalarFunction::CurrentDate
                 | BuiltinScalarFunction::CurrentTime
+                | BuiltinScalarFunction::Uuid
         )
     }
     /// Returns the [Volatility] of the builtin function.
@@ -266,6 +269,7 @@ impl BuiltinScalarFunction {
 
             // Volatile builtin functions
             BuiltinScalarFunction::Random => Volatility::Volatile,
+            BuiltinScalarFunction::Uuid => Volatility::Volatile,
         }
     }
 }
@@ -358,6 +362,7 @@ impl FromStr for BuiltinScalarFunction {
             "translate" => BuiltinScalarFunction::Translate,
             "trim" => BuiltinScalarFunction::Trim,
             "upper" => BuiltinScalarFunction::Upper,
+            "uuid" => BuiltinScalarFunction::Uuid,
             "regexp_match" => BuiltinScalarFunction::RegexpMatch,
             "struct" => BuiltinScalarFunction::Struct,
             "from_unixtime" => BuiltinScalarFunction::FromUnixtime,
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 006bcac5e..f68690588 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -438,7 +438,7 @@ pub fn array(args: Vec<Expr>) -> Expr {
 /// which is not NULL
 pub fn coalesce(args: Vec<Expr>) -> Expr {
     Expr::ScalarFunction {
-        fun: built_in_function::BuiltinScalarFunction::Coalesce,
+        fun: BuiltinScalarFunction::Coalesce,
         args,
     }
 }
@@ -460,6 +460,14 @@ pub fn current_date() -> Expr {
     }
 }
 
+/// Returns uuid v4 as a string value
+pub fn uuid() -> Expr {
+    Expr::ScalarFunction {
+        fun: BuiltinScalarFunction::Uuid,
+        args: vec![],
+    }
+}
+
 /// Returns current UTC time as a [`DataType::Time64`] value
 pub fn current_time() -> Expr {
     Expr::ScalarFunction {
@@ -684,6 +692,17 @@ mod test {
         test_unary_scalar_expr!(ArrowTypeof, arrow_typeof);
     }
 
+    #[test]
+    fn uuid_function_definitions() {
+        if let Expr::ScalarFunction { fun, args } = uuid() {
+            let name = built_in_function::BuiltinScalarFunction::Uuid;
+            assert_eq!(name, fun);
+            assert_eq!(0, args.len());
+        } else {
+            unreachable!();
+        }
+    }
+
     #[test]
     fn digest_function_definitions() {
         if let Expr::ScalarFunction { fun, args } = digest(col("tableA.a"), lit("md5")) {
diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs
index de5ab0e1a..515cce980 100644
--- a/datafusion/expr/src/function.rs
+++ b/datafusion/expr/src/function.rs
@@ -157,6 +157,7 @@ pub fn return_type(
             utf8_to_int_type(&input_expr_types[0], "octet_length")
         }
         BuiltinScalarFunction::Random => Ok(DataType::Float64),
+        BuiltinScalarFunction::Uuid => Ok(DataType::Utf8),
         BuiltinScalarFunction::RegexpReplace => {
             utf8_to_str_type(&input_expr_types[0], "regex_replace")
         }
@@ -580,6 +581,7 @@ pub fn signature(fun: &BuiltinScalarFunction) -> Signature {
             fun.volatility(),
         ),
         BuiltinScalarFunction::Random => Signature::exact(vec![], fun.volatility()),
+        BuiltinScalarFunction::Uuid => Signature::exact(vec![], fun.volatility()),
         BuiltinScalarFunction::Power => Signature::one_of(
             vec![
                 TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]),
diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml
index d281b9543..a7482072b 100644
--- a/datafusion/physical-expr/Cargo.toml
+++ b/datafusion/physical-expr/Cargo.toml
@@ -57,3 +57,4 @@ rand = "0.8"
 regex = { version = "^1.4.3", optional = true }
 sha2 = { version = "^0.10.1", optional = true }
 unicode-segmentation = { version = "^1.7.1", optional = true }
+uuid = { version = "^1.2", features = ["v4"] }
diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs
index cd1d31544..000991d07 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -760,6 +760,7 @@ pub fn create_physical_fun(
             ))),
         }),
         BuiltinScalarFunction::Upper => Arc::new(string_expressions::upper),
+        BuiltinScalarFunction::Uuid => Arc::new(string_expressions::uuid),
         _ => {
             return Err(DataFusionError::Internal(format!(
                 "create_physical_fun: Unsupported scalar function {:?}",
@@ -2760,7 +2761,11 @@ mod tests {
         let execution_props = ExecutionProps::new();
         let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
 
-        let funs = [BuiltinScalarFunction::Now, BuiltinScalarFunction::Random];
+        let funs = [
+            BuiltinScalarFunction::Now,
+            BuiltinScalarFunction::Random,
+            BuiltinScalarFunction::Uuid,
+        ];
 
         for fun in funs.iter() {
             create_physical_expr_with_type_coercion(fun, &[], &schema, &execution_props)?;
diff --git a/datafusion/physical-expr/src/string_expressions.rs b/datafusion/physical-expr/src/string_expressions.rs
index c13a853bb..5733234f8 100644
--- a/datafusion/physical-expr/src/string_expressions.rs
+++ b/datafusion/physical-expr/src/string_expressions.rs
@@ -32,7 +32,9 @@ use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::ColumnarValue;
 use std::any::type_name;
+use std::iter;
 use std::sync::Arc;
+use uuid::Uuid;
 
 macro_rules! downcast_string_arg {
     ($ARG:expr, $NAME:expr, $T:ident) => {{
@@ -586,3 +588,20 @@ where
 pub fn upper(args: &[ColumnarValue]) -> Result<ColumnarValue> {
     handle(args, |string| string.to_ascii_uppercase(), "upper")
 }
+
+/// Prints random (v4) uuid values per row
+/// uuid() = 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'
+pub fn uuid(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    let len: usize = match &args[0] {
+        ColumnarValue::Array(array) => array.len(),
+        _ => {
+            return Err(DataFusionError::Internal(
+                "Expect uuid function to take no param".to_string(),
+            ))
+        }
+    };
+
+    let values = iter::repeat_with(|| Uuid::new_v4().to_string()).take(len);
+    let array = GenericStringArray::<i32>::from_iter_values(values);
+    Ok(ColumnarValue::Array(Arc::new(array)))
+}
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 083f24502..3cb9763d3 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -497,6 +497,7 @@ enum ScalarFunction {
   ArrowTypeof=69;
   CurrentDate=70;
   CurrentTime=71;
+  Uuid=72;
 }
 
 message ScalarFunctionNode {
diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs
index 708fb51ec..775acc3a0 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -41,8 +41,8 @@ use datafusion_expr::{
     regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256,
     sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr,
     substring, tan, to_hex, to_timestamp_micros, to_timestamp_millis,
-    to_timestamp_seconds, translate, trim, trunc, upper, AggregateFunction, Between,
-    BuiltInWindowFunction, BuiltinScalarFunction, Case, Expr, GetIndexedField,
+    to_timestamp_seconds, translate, trim, trunc, upper, uuid, AggregateFunction,
+    Between, BuiltInWindowFunction, BuiltinScalarFunction, Case, Expr, GetIndexedField,
     GroupingSet,
     GroupingSet::GroupingSets,
     Like, Operator, WindowFrame, WindowFrameBound, WindowFrameUnits,
@@ -431,6 +431,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
             ScalarFunction::Now => Self::Now,
             ScalarFunction::CurrentDate => Self::CurrentDate,
             ScalarFunction::CurrentTime => Self::CurrentTime,
+            ScalarFunction::Uuid => Self::Uuid,
             ScalarFunction::Translate => Self::Translate,
             ScalarFunction::RegexpMatch => Self::RegexpMatch,
             ScalarFunction::Coalesce => Self::Coalesce,
@@ -970,6 +971,7 @@ pub fn parse_expr(
                     parse_expr(&args[1], registry)?,
                 )),
                 ScalarFunction::Random => Ok(random()),
+                ScalarFunction::Uuid => Ok(uuid()),
                 ScalarFunction::Repeat => Ok(repeat(
                     parse_expr(&args[0], registry)?,
                     parse_expr(&args[1], registry)?,
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 3557dee46..2aee09ab3 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -9622,6 +9622,7 @@ impl serde::Serialize for ScalarFunction {
             Self::ArrowTypeof => "ArrowTypeof",
             Self::CurrentDate => "CurrentDate",
             Self::CurrentTime => "CurrentTime",
+            Self::Uuid => "Uuid",
         };
         serializer.serialize_str(variant)
     }
@@ -9705,6 +9706,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
             "ArrowTypeof",
             "CurrentDate",
             "CurrentTime",
+            "Uuid",
         ];
 
         struct GeneratedVisitor;
@@ -9819,6 +9821,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
                     "ArrowTypeof" => Ok(ScalarFunction::ArrowTypeof),
                     "CurrentDate" => Ok(ScalarFunction::CurrentDate),
                     "CurrentTime" => Ok(ScalarFunction::CurrentTime),
+                    "Uuid" => Ok(ScalarFunction::Uuid),
                     _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
                 }
             }
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index d404f0275..4faa08fec 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1226,6 +1226,7 @@ pub enum ScalarFunction {
     ArrowTypeof = 69,
     CurrentDate = 70,
     CurrentTime = 71,
+    Uuid = 72,
 }
 impl ScalarFunction {
     /// String value of the enum field names used in the ProtoBuf definition.
@@ -1306,6 +1307,7 @@ impl ScalarFunction {
             ScalarFunction::ArrowTypeof => "ArrowTypeof",
             ScalarFunction::CurrentDate => "CurrentDate",
             ScalarFunction::CurrentTime => "CurrentTime",
+            ScalarFunction::Uuid => "Uuid",
         }
     }
 }
diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs
index 1aea38f3a..44f27429a 100644
--- a/datafusion/proto/src/to_proto.rs
+++ b/datafusion/proto/src/to_proto.rs
@@ -1164,6 +1164,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
             BuiltinScalarFunction::Left => Self::Left,
             BuiltinScalarFunction::Lpad => Self::Lpad,
             BuiltinScalarFunction::Random => Self::Random,
+            BuiltinScalarFunction::Uuid => Self::Uuid,
             BuiltinScalarFunction::RegexpReplace => Self::RegexpReplace,
             BuiltinScalarFunction::Repeat => Self::Repeat,
             BuiltinScalarFunction::Replace => Self::Replace,