You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/16 11:02:45 UTC

[GitHub] [arrow] alamb commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

alamb commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r471097019



##########
File path: rust/datafusion/src/execution/physical_plan/udf.rs
##########
@@ -37,8 +37,11 @@ pub type ScalarUdf = Arc<dyn Fn(&[ArrayRef]) -> Result<ArrayRef> + Send + Sync>;
 pub struct ScalarFunction {
     /// Function name
     pub name: String,
-    /// Function argument meta-data
-    pub args: Vec<Field>,
+    /// Set of valid argument types.
+    /// The first dimension (0) represents specific combinations of valid argument types
+    /// The second dimension (1) represents the types of each argument.
+    /// For example, [[t1, t2]] is a function of 2 arguments that only accept t1 on the first arg and t2 on the second
+    pub args: Vec<Vec<DataType>>,

Review comment:
       ```suggestion
       pub arg_types: Vec<Vec<DataType>>,
   ```

##########
File path: rust/datafusion/src/execution/physical_plan/udf.rs
##########
@@ -37,8 +37,11 @@ pub type ScalarUdf = Arc<dyn Fn(&[ArrayRef]) -> Result<ArrayRef> + Send + Sync>;
 pub struct ScalarFunction {
     /// Function name
     pub name: String,
-    /// Function argument meta-data
-    pub args: Vec<Field>,
+    /// Set of valid argument types.
+    /// The first dimension (0) represents specific combinations of valid argument types
+    /// The second dimension (1) represents the types of each argument.
+    /// For example, [[t1, t2]] is a function of 2 arguments that only accept t1 on the first arg and t2 on the second

Review comment:
       I suggest you move this comment to `FunctionMeta` @ https://github.com/apache/arrow/pull/7967/files#diff-be9adb93b0effc41a9672892e1aed3e1R50 (which is more likely where someone writing a UDF would be interacting) and refer to that comment here

##########
File path: rust/datafusion/src/logicalplan.rs
##########
@@ -75,7 +75,7 @@ impl FunctionMeta {
         &self.name
     }
     /// Getter for the arg list
-    pub fn args(&self) -> &Vec<Field> {
+    pub fn args(&self) -> &Vec<Vec<DataType>> {

Review comment:
       ```suggestion
       pub fn arg_types(&self) -> &Vec<Vec<DataType>> {
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them

Review comment:
       ```suggestion
       // for each set of valid signatures, try to coerce all expressions to one of them
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;

Review comment:
       ```suggestion
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it
+        if let Some(types) = maybe_coerse(valid_types, &current_types) {
+            // yes: let's re-write the expressions
+            new_expressions = Some(
+                expressions
+                    .iter()
+                    .enumerate()
+                    .map(|(i, expr)| expr.cast_to(&types[i], schema))
+                    .collect::<Result<Vec<_>>>()?,
+            );
+            break;
+        }
+        // we cannot: try the next
+    }
+    Ok(new_expressions)
+}
+
+/// Try to coerse current_types into valid_types
+fn maybe_coerse(
+    valid_types: &Vec<DataType>,
+    current_types: &Vec<DataType>,
+) -> Option<Vec<DataType>> {
+    let mut super_type = Vec::with_capacity(valid_types.len());
+    for (i, valid_type) in valid_types.iter().enumerate() {
+        let current_type = &current_types[i];
+        if let Ok(t) = utils::get_supertype(current_type, valid_type) {
+            super_type.push(t)
+        } else {
+            return None;
+        }
+    }
+    Some(super_type)
+}
+
 #[cfg(test)]

Review comment:
       I think we should have some unit tests of `maybe_coerse ` and `maybe_rewrite` -- I reviewed the logic and it looks good to me, but tests would help against regressions (someone messing up the code in the future without realizing it b/c tests kept passing) as well as serving as another form of documentation (enumerating expected inputs and outputs)

##########
File path: rust/datafusion/src/logicalplan.rs
##########
@@ -47,8 +47,8 @@ pub enum FunctionType {
 pub struct FunctionMeta {
     /// Function name
     name: String,
-    /// Function arguments
-    args: Vec<Field>,
+    /// Function arguments. Each argument i can be one of the types of args[i], with respective priority
+    args: Vec<Vec<DataType>>,

Review comment:
       ```suggestion
       arg_types: Vec<Vec<DataType>>,
   ```

##########
File path: rust/datafusion/src/execution/physical_plan/udf.rs
##########
@@ -60,7 +63,7 @@ impl ScalarFunction {
     /// Create a new ScalarFunction
     pub fn new(
         name: &str,
-        args: Vec<Field>,
+        args: Vec<Vec<DataType>>,

Review comment:
       ```suggestion
           arg_types: Vec<Vec<DataType>>,
   ```

##########
File path: rust/datafusion/src/logicalplan.rs
##########
@@ -59,7 +59,7 @@ impl FunctionMeta {
     #[allow(missing_docs)]
     pub fn new(
         name: String,
-        args: Vec<Field>,
+        args: Vec<Vec<DataType>>,

Review comment:
       ```suggestion
           arg_types: Vec<Vec<DataType>>,
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it

Review comment:
       ```suggestion
           // for each option, try to coerce all arguments to it
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it
+        if let Some(types) = maybe_coerse(valid_types, &current_types) {
+            // yes: let's re-write the expressions
+            new_expressions = Some(
+                expressions
+                    .iter()
+                    .enumerate()
+                    .map(|(i, expr)| expr.cast_to(&types[i], schema))
+                    .collect::<Result<Vec<_>>>()?,
+            );
+            break;
+        }
+        // we cannot: try the next
+    }
+    Ok(new_expressions)
+}
+
+/// Try to coerse current_types into valid_types
+fn maybe_coerse(

Review comment:
       ```suggestion
   fn maybe_coerce(
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -84,29 +84,46 @@ impl<'a> TypeCoercionRule<'a> {
                 args,
                 return_type,
             } => {
-                // cast the inputs of scalar functions to the appropriate type where possible
+                // cast the inputs of scalar functions to the appropriate type
                 match self.scalar_functions.get(name) {
                     Some(func_meta) => {
-                        let mut func_args = Vec::with_capacity(args.len());
-                        for i in 0..args.len() {
-                            let field = &func_meta.args[i];
-                            let expr = self.rewrite_expr(&args[i], schema)?;
-                            let actual_type = expr.get_type(schema)?;
-                            let required_type = field.data_type();
-                            if &actual_type == required_type {
-                                func_args.push(expr)
-                            } else {
-                                let super_type =
-                                    utils::get_supertype(&actual_type, required_type)?;
-                                func_args.push(expr.cast_to(&super_type, schema)?);
-                            }
-                        }
+                        // compute the current types and expressions
+                        let expressions = args
+                            .iter()
+                            .map(|e| self.rewrite_expr(e, schema))
+                            .collect::<Result<Vec<_>>>()?;
+
+                        // compute the current types and expressions

Review comment:
       ```suggestion
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it
+        if let Some(types) = maybe_coerse(valid_types, &current_types) {

Review comment:
       ```suggestion
           if let Some(types) = maybe_coerce(valid_types, &current_types) {
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it
+        if let Some(types) = maybe_coerse(valid_types, &current_types) {
+            // yes: let's re-write the expressions
+            new_expressions = Some(
+                expressions
+                    .iter()
+                    .enumerate()
+                    .map(|(i, expr)| expr.cast_to(&types[i], schema))
+                    .collect::<Result<Vec<_>>>()?,
+            );
+            break;
+        }
+        // we cannot: try the next
+    }
+    Ok(new_expressions)

Review comment:
       ```suggestion
       Ok(None)
   ```
   
   (this is stylistic suggestion -- you can avoid a `mut` variable and just return on the branch when the type coercion worked) 

##########
File path: rust/datafusion/src/sql/planner.rs
##########
@@ -523,10 +523,14 @@ impl<S: SchemaProvider> SqlToRel<S> {
 
                             let mut safe_args: Vec<Expr> = vec![];
                             for i in 0..rex_args.len() {
-                                safe_args.push(
-                                    rex_args[i]
-                                        .cast_to(fm.args()[i].data_type(), schema)?,
-                                );
+                                let expr = if fm.args()[i]
+                                    .contains(&rex_args[i].get_type(schema)?)
+                                {
+                                    rex_args[i].clone()
+                                } else {
+                                    rex_args[i].cast_to(&fm.args()[i][0], schema)?

Review comment:
       Adding a test for a user defined function that takes no args would probably be good -- an example of such a function might be `rand()` 

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it
+        if let Some(types) = maybe_coerse(valid_types, &current_types) {
+            // yes: let's re-write the expressions
+            new_expressions = Some(
+                expressions
+                    .iter()
+                    .enumerate()
+                    .map(|(i, expr)| expr.cast_to(&types[i], schema))
+                    .collect::<Result<Vec<_>>>()?,
+            );
+            break;

Review comment:
       ```suggestion
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it
+        if let Some(types) = maybe_coerse(valid_types, &current_types) {
+            // yes: let's re-write the expressions
+            new_expressions = Some(

Review comment:
       ```suggestion
               return Some(
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org