You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/15 10:57:57 UTC

[GitHub] [arrow] jorgecarleitao opened a new pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

jorgecarleitao opened a new pull request #7967:
URL: https://github.com/apache/arrow/pull/7967


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r484642205



##########
File path: rust/datafusion/src/execution/dataframe_impl.rs
##########
@@ -232,6 +241,50 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn registry() -> Result<()> {
+        let mut ctx = ExecutionContext::new();
+        register_aggregate_csv(&mut ctx)?;
+
+        // declare the udf
+        let my_add: ScalarFunctionImplementation = Arc::new(|args: &[ArrayRef]| {

Review comment:
       Well spotted. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-674383209


   https://issues.apache.org/jira/browse/ARROW-9751


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r483416817



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -975,7 +997,7 @@ mod tests {
         let provider = MemTable::new(Arc::new(schema), vec![vec![batch]])?;
         ctx.register_table("t", Box::new(provider));
 
-        let myfunc: ScalarUdf = Arc::new(|args: &[ArrayRef]| {

Review comment:
       This was a rename, to make it more explicit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r483428265



##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -368,32 +368,22 @@ impl DefaultPhysicalPlanner {
                     .collect::<Result<Vec<_>>>()?;
                 functions::create_physical_expr(fun, &physical_args, input_schema)
             }
-            Expr::ScalarUDF {
-                name,
-                args,
-                return_type,
-            } => match ctx_state.scalar_functions.get(name) {
-                Some(f) => {
-                    let mut physical_args = vec![];
-                    for e in args {
-                        physical_args.push(self.create_physical_expr(
-                            e,
-                            input_schema,
-                            ctx_state,
-                        )?);
-                    }
-                    Ok(Arc::new(ScalarFunctionExpr::new(
-                        name,
-                        f.fun.clone(),
-                        physical_args,
-                        return_type,
-                    )))
+            Expr::ScalarUDF { fun, args } => {
+                let mut physical_args = vec![];
+                for e in args {
+                    physical_args.push(self.create_physical_expr(
+                        e,
+                        input_schema,
+                        ctx_state,
+                    )?);
                 }
-                _ => Err(ExecutionError::General(format!(
-                    "Invalid scalar function '{:?}'",
-                    name
-                ))),
-            },
+
+                udf::create_physical_expr(

Review comment:
       This is where casting would happen for UDFs under this PR, during physical planning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-683713264


   # D
   I think in any UDF implementation, the UDF author will be required to ensure their physical expression implementation matches the type information reported to the DataFusion logical planners. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-674409587


   @andygrove and @alamb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-674484413


   > > The alternative is for `[[t1, t2]]` to represent one valid set of types for a function of two arguments, instead of `[[t1], [t2]]`. This way we can constraint arguments together. We would need to check that each entry on the list has the same number of entries = arguments.
   > 
   > This seems reasonable to me.
   
   @andygrove , I have re-factored it to use this approach instead.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-683301870


   Thanks @alamb for that write-up of options. I am short on time this weekend due to work commitments but here are my thoughts so far:
   
   Option 1 seems to add complexity in the internal implementation.
   
   Option 2 is the closest to what Spark and other query engines do and is the simplest to implement. As a side note, when you register a Scala UDF in Spark, you do not specify a return type. It is always `Any`, which does not seem like a good design to me.
   
   Option 3 seems like the most elegant way to achieve Jorge's requirement and is a great example of the difference between a logical plan and a physical plan. At first, I had a concern about the logical and physical plans ending up with different schemas, but I suppose users would only have this schema difference if they choose to register these types of dynamic functions in the first place, so I think that alleviates my concerns here.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-688939013


   @andygrove , done :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r471442333



##########
File path: rust/datafusion/src/sql/planner.rs
##########
@@ -523,10 +523,14 @@ impl<S: SchemaProvider> SqlToRel<S> {
 
                             let mut safe_args: Vec<Expr> = vec![];
                             for i in 0..rex_args.len() {
-                                safe_args.push(
-                                    rex_args[i]
-                                        .cast_to(fm.args()[i].data_type(), schema)?,
-                                );
+                                let expr = if fm.args()[i]
+                                    .contains(&rex_args[i].get_type(schema)?)
+                                {
+                                    rex_args[i].clone()
+                                } else {
+                                    rex_args[i].cast_to(&fm.args()[i][0], schema)?

Review comment:
       makes sense -- I guess I would suggest we add a test to ensure the system behaves reasonably (aka generates an error rather than a crash) with 0 argument UDFs -- but if there is something challenging about implementing them it doesn't have to be done in this PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-679075478


   I will try and review this carefully later today sometime -- I am on
   vacation this week with my family so my responses will likely be delayed
   compared to normal (not that I have been as prompt as you have been anyways
   :) )
   
   On Sun, Aug 23, 2020 at 9:32 AM Jorge Leitao <no...@github.com>
   wrote:
   
   > The code you pointed to reads return_type: DataType. I will assume you
   > mean the return type declared in Expr::ScalarFunctions.
   >
   > Two minds thinking alike: I was just trying to do that in the codebase.
   >
   > Unfortunately, I do not think that that is sufficient 😞 : when a
   > projection is declared, we need to resolve its schema's type, which we do
   > via Expr::get_type. If we do not have the UDF's return_type on
   > Expr::ScalarFunction, we can't know its return type, which means we can't
   > even project (even before optimizations).
   >
   > But to get the UDF's DataType, we need to access the UDF's registry. What
   > we currently do is let the user decide the DataType for us in the logical
   > plane via the call scalar_function("name", vec![args..], DATATYPE).
   > Unfortunately, this means that the user needs to know the return type of
   > the UDF, or it will all break during planning, when the physical plan has
   > nothing to do with the logical one. I would prefer that the user does not
   > have to have this burden: it registers a UDF with the type, and then just
   > plans a call without its return type, during planning.
   >
   > I am formalizing a proposal to address this. The gist is that we can't
   > have "meta" of UDFs in the logical plan: they need to know their return
   > type, which means that we need to access the registry during planning.
   >
   > I am developing some ideas for this here
   > <https://docs.google.com/document/d/1Kzz642ScizeKXmVE1bBlbLvR663BKQaGqVIyy9cAscY/edit?usp=sharing>
   > .
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/arrow/pull/7967#issuecomment-678781518>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AADXZMOSF2MK7Z5G5LHGZXTSCER7LANCNFSM4QAG5BEA>
   > .
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-674413020


   > @jorgecarleitao This looks good but I have one concern/question. What happens if I declare a function with two args and both args can be Float32 or Float64, but when I call it, one arg is i8 and the other arg is f64? Would they both get cast to f64 in this case? or would one be cast to f32 and the other to f64? The latter would not work if the function requires both types to be the same.
   
   Great point! I have not though about those types of constraints so far.
   
   This currently does not have constraints on "arguments have to be the same"; it assumes that the function handles each variation of the arguments independently: in your example, it would be coerced to f32 and f64. We can expand it to include constraints and "get_supertype" taking into account those constraints.
   
   The alternative is for `[[t1, t2]]` to represent one valid set of types for a function of two arguments, instead of `[[t1], [t2]]`. This way we can constraint arguments together. We would need to check that each entry on the list has the same number of entries = arguments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-678766915


    @jorgecarleitao  -- another thing I can think of would be to postpone the UDF resolution until the type coercion logical optimizer pass.
   
   So in other words, when converting `SQL` --> `LogicalPlan`we would postpone populating `data_type` in `ScalarFunction` (or maybe have a `ScalarFunctionPlaceholder` or something) that would not have a data_type. Then we would add something to the type coercion logic that would resolve `ScalarFunctionPlaceholder` to `ScalarFunction`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-678780754


   > When you mean data_type you mean the arguments' types or the return_type?
   
   
   I was referring to `Expr::ScalarFunction::data_type`:
   
   https://github.com/apache/arrow/blob/master/rust/datafusion/src/logicalplan.rs#L323
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-678781518


   The code you pointed to reads `return_type: DataType`. I will assume you mean the return type declared in `Expr::ScalarFunctions`.
   
   Two minds thinking alike: I was just trying to do that in the codebase.
   
   Unfortunately, I do not think that that is sufficient 😞 : when a projection is declared, we need to resolve its schema's type, which we do via `Expr::get_type`. If we do not have the UDF's `return_type` on `Expr::ScalarFunction`, we can't know its return type, which means we can't even project (even before optimizations).
   
   But to get the UDF's `DataType`, we need to access the UDF's registry. What we currently do is let the user decide the `DataType` for us in the logical plane via the call `scalar_function("name", vec![args..], DATATYPE)`. Unfortunately, this means that the user needs to know the return type of the UDF, or it will all break during planning, when the physical plan has nothing to do with the logical one. I would prefer that the user does not have to have this burden: it registers a UDF with the type, and then just plans a call without its return type, during planning.
   
   I am formalizing a proposal to address this. The gist is that we can't have "meta" of UDFs in the logical plan: they need to know their return type, which means that we need to access the registry during planning.
   
   I am developing some ideas for this [here](https://docs.google.com/document/d/1Kzz642ScizeKXmVE1bBlbLvR663BKQaGqVIyy9cAscY/edit?usp=sharing).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-674563365


   > I think that the logic of get_supertype is not entirely correct atm (e.g. utf8 can be converted to all types), but we have an issue tracking that.
   
   Yes, it is bad code and quite incorrect in some cases and needs re-doing. I have had a few failed attempts at this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r471445155



##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -345,4 +345,143 @@ mod tests {
 
         assert_eq!(expected, format!("{:?}", expr2));
     }
+
+    #[test]
+    fn test_maybe_coerce() -> Result<()> {
+        // this vec contains: arg1, arg2, expected result
+        let cases = vec![
+            // 2 entries, same values
+            (
+                vec![DataType::UInt8, DataType::UInt16],
+                vec![DataType::UInt8, DataType::UInt16],
+                Some(vec![DataType::UInt8, DataType::UInt16]),
+            ),
+            // 2 entries, can coerse values
+            (
+                vec![DataType::UInt16, DataType::UInt16],
+                vec![DataType::UInt8, DataType::UInt16],
+                Some(vec![DataType::UInt16, DataType::UInt16]),
+            ),
+            // 0 entries, all good
+            (vec![], vec![], Some(vec![])),
+            // 2 entries, can't coerce
+            (
+                vec![DataType::Boolean, DataType::UInt16],
+                vec![DataType::UInt8, DataType::UInt16],
+                None,
+            ),
+            // u32 -> u16 is possible
+            (
+                vec![DataType::Boolean, DataType::UInt32],
+                vec![DataType::Boolean, DataType::UInt16],
+                Some(vec![DataType::Boolean, DataType::UInt32]),
+            ),
+        ];
+
+        for case in cases {
+            assert_eq!(maybe_coerce(&case.0, &case.1), case.2)
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_maybe_rewrite() -> Result<()> {
+        // create a schema
+        let schema = |t: Vec<DataType>| {
+            Schema::new(
+                t.iter()
+                    .enumerate()
+                    .map(|(i, t)| Field::new(&*format!("c{}", i), t.clone(), true))
+                    .collect(),
+            )
+        };
+
+        // create a vector of expressions
+        let expressions = |t: Vec<DataType>, schema| -> Result<Vec<Expr>> {
+            t.iter()
+                .enumerate()
+                .map(|(i, t)| col(&*format!("c{}", i)).cast_to(&t, &schema))

Review comment:
       ```suggestion
                   .map(|(i, t)| col(&format!("c{}", i)).cast_to(&t, &schema))
   ```

##########
File path: rust/datafusion/src/sql/planner.rs
##########
@@ -515,27 +515,29 @@ impl<S: SchemaProvider> SqlToRel<S> {
                     }
                     _ => match self.schema_provider.get_function_meta(&name) {
                         Some(fm) => {
-                            let rex_args = function

Review comment:
       I wonder given the emphasis on pluggable planers, if 52218c852b7b3016afeaf95d8a46d6deea89d231 (removing the type coercion from physical planner) is a good idea. As in "is it ok to assume that all plans went through the existing `Optimizer` passes before being converted"
   
   It seems reasonable to me, but it might be worth mentioning somewhere (e.g. on the physical planner, etc)

##########
File path: rust/datafusion/tests/sql.rs
##########
@@ -232,6 +326,55 @@ fn custom_sqrt(args: &[ArrayRef]) -> Result<ArrayRef> {
     Ok(Arc::new(builder.finish()))
 }
 
+fn custom_add(args: &[ArrayRef]) -> Result<ArrayRef> {
+    match (args[0].data_type(), args[1].data_type()) {
+        (DataType::Float64, DataType::Float64) => {
+            let input1 = &args[0]
+                .as_any()
+                .downcast_ref::<Float64Array>()
+                .expect("cast failed");
+            let input2 = &args[1]
+                .as_any()
+                .downcast_ref::<Float64Array>()
+                .expect("cast failed");
+
+            let mut builder = Float64Builder::new(input1.len());
+            for i in 0..input1.len() {
+                if input1.is_null(i) || input2.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(input1.value(i) + input2.value(i))?;
+                }
+            }
+            Ok(Arc::new(builder.finish()))
+        }
+        (DataType::Float32, DataType::Float32) => {
+            // all other cases return a constant vector (just to be diferent)
+            let mut builder = Float64Builder::new(args[0].len());
+            for _ in 0..args[0].len() {
+                builder.append_value(3232.0)?;
+            }
+            Ok(Arc::new(builder.finish()))
+        }
+        (DataType::Float32, DataType::Float64) => {
+            // all other cases return a constant vector (just to be diferent)
+            let mut builder = Float64Builder::new(args[0].len());
+            for _ in 0..args[0].len() {
+                builder.append_value(3264.0)?;
+            }
+            Ok(Arc::new(builder.finish()))
+        }
+        (_, _) => {
+            // all other cases return a constant vector (just to be diferent)

Review comment:
       maybe it is worth `panic!` if the argument types didn't match the registration of the UDF

##########
File path: rust/datafusion/src/execution/physical_plan/udf.rs
##########
@@ -37,8 +37,11 @@ pub type ScalarUdf = Arc<dyn Fn(&[ArrayRef]) -> Result<ArrayRef> + Send + Sync>;
 pub struct ScalarFunction {
     /// Function name
     pub name: String,
-    /// Function argument meta-data
-    pub args: Vec<Field>,
+    /// Set of valid argument types.
+    /// The first dimension (0) represents specific combinations of valid argument types
+    /// The second dimension (1) represents the types of each argument.
+    /// For example, [[t1, t2]] is a function of 2 arguments that only accept t1 on the first arg and t2 on the second

Review comment:
       makes sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao edited a comment on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-679817828


   I am reverting this PR back to draft, as IMO we first need to differentiate between UDFs and built-in functions, before we can even consider expanding the pool of available data types to UDFs. See discussions on PR #8032


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r471146570



##########
File path: rust/datafusion/src/execution/physical_plan/math_expressions.rs
##########
@@ -20,36 +20,62 @@
 use crate::error::ExecutionError;
 use crate::execution::physical_plan::udf::ScalarFunction;
 
-use arrow::array::{Array, ArrayRef, Float64Array, Float64Builder};
-use arrow::datatypes::{DataType, Field};
+use arrow::array::{Array, ArrayRef};
+use arrow::array::{Float32Array, Float64Array};
+use arrow::datatypes::DataType;
 
 use std::sync::Arc;
 
+macro_rules! compute_op {
+    ($ARRAY:expr, $FUNC:ident, $TYPE:ident) => {{
+        let mut builder = <$TYPE>::builder($ARRAY.len());
+        for i in 0..$ARRAY.len() {
+            if $ARRAY.is_null(i) {
+                builder.append_null()?;
+            } else {
+                builder.append_value($ARRAY.value(i).$FUNC())?;
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! downcast_compute_op {
+    ($ARRAY:expr, $NAME:expr, $FUNC:ident, $TYPE:ident) => {{
+        let n = $ARRAY.as_any().downcast_ref::<$TYPE>();
+        match n {
+            Some(array) => compute_op!(array, $FUNC, $TYPE),
+            _ => Err(ExecutionError::General(format!(
+                "Invalid data type for {}",
+                $NAME
+            ))),
+        }
+    }};
+}
+
+macro_rules! unary_primitive_array_op {
+    ($ARRAY:expr, $NAME:expr, $FUNC:ident) => {{
+        match ($ARRAY).data_type() {
+            DataType::Float32 => downcast_compute_op!($ARRAY, $NAME, $FUNC, Float32Array),
+            DataType::Float64 => downcast_compute_op!($ARRAY, $NAME, $FUNC, Float64Array),
+            other => Err(ExecutionError::General(format!(
+                "Unsupported data type {:?} for function {}",
+                other, $NAME,
+            ))),
+        }
+    }};
+}
+
 macro_rules! math_unary_function {
     ($NAME:expr, $FUNC:ident) => {
         ScalarFunction::new(
             $NAME,
-            vec![Field::new("n", DataType::Float64, true)],
+            // order: from faster to slower
+            vec![vec![DataType::Float32], vec![DataType::Float64]],
             DataType::Float64,

Review comment:
       Glad you asked! Yes! It just takes a bit more changes: the main issue is that this has to be consistent with Logical expressions and they currently only support a single return type. I am proposing a generalization of this whole thing here: https://github.com/apache/arrow/pull/7974 , so that both logical and physical plans yield a consistent and variable data type.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-682598627


   Hi @jorgecarleitao and @alamb I've been thinking about this specific change some more, and I am not sure it is such a good idea to have dynamic typing when it comes to the output type of a scalar function. If I have a query that uses `sqrt` for example, I would now have to always check the return type in the plan to know what type to cast to (`f32` or `f64`) because I could get a different type each time I run it, depending on the types in the input data. I think it would be better for scalar functions to have deterministic output types and this would be consistent with how other query engines are designed. In the cast of `sqrt`, I think it would make sense to support a specific `sqrt_32` version though, as I think @alamb suggested at some point. Let me know what you think.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-683467212


   Thanks a lot @alamb for sharing that. I now see what you mean.
   
   I do not think I was able to convince myself, likely because I did not fully understand everything. So maybe you can help me; below is my reasoning so far:
   
   #### A.
   
   This introduces an interesting feature: a UDF is no longer uniquely identifiable by its name: if a user registers two functions with equal names (like `sqrt_32` and `sqrt_64` above), they both become part of the registry, and it is up to the coercer to decide which one to use based on the type.
   
   On the one hand, that would allow to register variants of the function, which allows to extend an existing UDF; OTOH, it may not be very intuitive that a single function "sqrt" can be registered multiple times, and it is its signature and the coercer's logic that decides which one to use. 
   
   In particular, the registry can't warn or error if the user tries to re-register a function with the same name: it must assume that every new registration is a potential new variant that the coercer may pick up.
   
   Wouldn't this also mean that users would need to ensure that they do not register 2 traits with the same name and compatible `desired_return_type`? I can imagine the situation where the coercer returns non-deterministic plans due to two variants being valid, and one being picked up by chance. Or, that the order on which functions of compatible signatures are registered dictates which one is used.
   
   #### B.
   
   `concat::desired_argument_types` returns `[None, None]`.
   
   I though that we wanted `concat` to be:
   
   1. accept only `LargeUTF8` or `UTF8`
   2. accept an arbitrary number of arguments (or the user needs to write/see in the plan `concat(concat(concat(c1, c2), "-"), c4)` instead of `concat(c1, c2, "-", c4)`)
   
   Under this API, wouldn't we need to return `None` in `desired_argument_types`? I think that this shifts gives the coercer an impossible problem: since the coercer does not know which are possible valid types of `concat` to coerce to (`[large]utf8`), it would need to scan through all data types and use `valid_argument_types` as the predicate to select which one to pick from.
   
   This would be even more challenging for `array`. There, we want the coercer to coerce every argument to a compatible type between themselves (what spark does), but we only have access to a predicate, `valid_argument_types`. I can't see how we could let the coercer know that it needs to coerce all arguments to a common type without scanning all combinations of all arguments.
   
   One alternative is to use something like the `enum Signature` from #8080 , that helps the coercer focus on the set of valid types. The thing is that we can no longer guarantee non-overlapping signatures, as the same name may have two traits with two compatible signatures (point A. above), and it is not so easy to debug what it is happening.
   
   #### C.
   
   I think that we also need to have in the trait the functions' return type function, something like 
   
   ```
   fn return_type(arg_types: &[DataType]) -> Result<DataType>;
   ```
   
   so that we can check `f(g(c1), c2)` based on `g`'s return type.
   
   #### D.
   
   Wouldn't the users need to continue to preserve the invariant themselves that the function's `PhysicalExpr::evaluate` coming from `make_physical_expr` equals the function's incoming types? I think that they would continue to need to go back and forth in the code to ensure that `PhysicalExpr`'s implementation matches `struct concat` implementation of `desired_return_type`, like we also do.
   
   #### E.
   
   Currently, the interface to register a UDF is:
   
   ```
   let my_add = ScalarFunction::new(
       "my_add",
       vec![DataType::Int32, DataType::Int32],
       DataType::Int32,
       myfunc,
   );
   
   ctx.register_udf(my_add);
   ```
   
   Wouldn't this new approach mean that a user would be unable to declare a UDF inline; i.e. they would always need to implement the stateless trait? This is not very relevant to SQL, but in the DataFrame API, being able to register functions "on the fly" is quite neat, as it keeps its declaration closer to its usage (in the code). 
   
   This inline is particularly powerful when the user does not even access the registry, like spark supports (and there is nothing forbidding us from supporting it too: we just defer the registration to a bit later).
   
   An alternative that comes to mind to support all use-cases is to use 2 closures and 1 function signature:
   
   ```
   ScalarFunction::new(
       name: String,
       function: ScalarUdf,
       return_type: ReturnType,
       signature: Signature,
   )
   ```
   
   where 
   
   ```
   pub type ScalarUdf = Arc<dyn Fn(&[ArrayRef]) -> Result<ArrayRef> + Send + Sync>;
   pub type ReturnType = Arc<dyn Fn(&Vec<DataType>) -> Result<DataType> + Send + Sync>;
   
   enum Signature; // as in #8080
   ```
   
   and offer some utility methods to register simpler functions (e.g. ```udf(name: String, fun: ScalarUdf, in: DataType, out: DataType)```.
   
   Final note: this whole discussion may be a bit too detailed for udfs, and we could instead offer a simpler interface to not handle these complex cases. However, this whole discussion applies in almost the same way to built-in functions: the only difference is whether they are of `static` lifetime or part of a registry. That is why I am so interested in all the "hard" cases.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r483416817



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -975,7 +997,7 @@ mod tests {
         let provider = MemTable::new(Arc::new(schema), vec![vec![batch]])?;
         ctx.register_table("t", Box::new(provider));
 
-        let myfunc: ScalarUdf = Arc::new(|args: &[ArrayRef]| {

Review comment:
       This is a rename, to make it more explicit. This signature is shared between scalar UDFs and scalar built-in functions at the physical plane, thus the name `ScalarFunctionImplementation`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r483416446



##########
File path: rust/datafusion/src/logical_plan/mod.rs
##########
@@ -268,18 +271,16 @@ pub enum Expr {
     /// scalar function.
     ScalarFunction {
         /// The function
-        fun: functions::ScalarFunction,
+        fun: functions::BuiltinFunction,

Review comment:
       I renamed this because `ScalarFunction` could be either a built-in or a UDF.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-683710210


   # A 
   > I can imagine the situation where the coercer returns non-deterministic plans due to two variants being valid, and one being picked up by chance. Or, that the order on which functions of compatible signatures are registered dictates which one is used.
   
   Yes, you are correct, the system would have to be deterministic about which variant was chosen -- the easiest thing to understand would likely be registration order, as you suggest. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-683411727


   > With this said, as an exercise, let me try to write how I imagine an interface could look like for option 3, just to check if I have the same understanding as you do.
   
   I think I had a slightly different idea. Here is one idea for an interface for defining UDFs that I think covers all the cases you have in mind (though it doesn't talk about implementation at all):
   
   ## UDF Registration:
   
   ```
   trait UDF {
     // return the name that the user refers to invoke this function
     fn name(&self) -> &str;
   
     // Return desired argument types. 
     // If desired type is "None" then no type coercion is done and any number of arguments
     // are accepted during logical planning. 
     // if desired type is a slice, the logical planner will require the function is called with exactly that number
     // of arguments and  will attempt to coerce arguments into these types. If any type is `None` then no coercion 
     // will be done on that argument
     fn desired_argument_types(&self) -> Option<&[<Option<DataType>>]>
   
     // given the specified argument types, returns true if this function can 
     fn valid_argument_types(arg_types: &[DataType]) -> bool
   
     //  create the appropriate PhysicalExpression
     fn make_physical_expr(&self, arg_types: &[DataType]) -> Box<dyn PhysicalExpr>;
   }
   ```
   
   Here is an sketch of registering sqrt with both 32 and 64 variants:
   ```
   struct sqrt_32 {}
   impl UDF for sqrt_32 {
     fn name(&self) { "sqrt"}
     fn desired_argument_types(&self) { [Float32] }
     fn valid_argument_types(arg_types: &[DataType]) { arg_types == [Float32] }
     fn make_physical_expr(&self, arg_types: &[DataType]) {...}
   }
   
   struct sqrt_64 {}
   impl UDF for sqrt_64 {
     fn name(&self) { "sqrt"}
     fn desired_argument_types(&self) { [Float64] }
     fn valid_argument_types(arg_types: &[DataType]) { arg_types == [Float64] }
     fn make_physical_expr(&self, arg_types: &[DataType]) {...}
   }
   
   ```
   
   The user would write `"sqrt(c)" `and then the type coercion logic would change that to `sqrt_64(cast c as Float64)` or perhaps `sqrt_32(c)` (if c was float 32). 
   
   And you can imagine the type coercion logic hitting a `sqrt` function, and then trying to coerce arguments to Float32 first to match the first, and if that wasn't possible, try to coerce to Float64
   
   Here is an example of "concat" that can take two exactly two arguments of the same type
   ```
   struct concat {}
   impl UDF for concat {
     fn name(&self) { "concat"}
     fn desired_argument_types(&self) { [None, None] }
     fn valid_argument_types(arg_types: &[DataType]) { arg_types.len() == 2 && arg_types[0] == arg_types[1] }
     fn make_physical_expr(&self, arg_types: &[DataType]) {...}
   }
   ```
   
   
   Here is an example of a `array` 
   ```
   struct array {}
   impl UDF for array {
     fn name(&self) { "array"}
     fn desired_argument_types(&self) { None }
     fn valid_argument_types(arg_types: &[DataType]) { ... custom logic to make sure all types are the same here ... }
     fn make_physical_expr(&self, arg_types: &[DataType]) {...}
   }
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-679069890


   >  I would prefer that the user does not have to have this burden: it registers a UDF with the type, and then just plans a call without its return type, during planning.
   
   This sounds like the right approach to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-683711848


   # C
   > I think that we also need to have in the trait the functions' return type function
   
   Yes you are right 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-674413474


   > The alternative is for `[[t1, t2]]` to represent one valid set of types for a function of two arguments, instead of `[[t1], [t2]]`. This way we can constraint arguments together. We would need to check that each entry on the list has the same number of entries = arguments.
   
   This seems reasonable to me.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r471134987



##########
File path: rust/datafusion/src/execution/physical_plan/udf.rs
##########
@@ -37,8 +37,11 @@ pub type ScalarUdf = Arc<dyn Fn(&[ArrayRef]) -> Result<ArrayRef> + Send + Sync>;
 pub struct ScalarFunction {
     /// Function name
     pub name: String,
-    /// Function argument meta-data
-    pub args: Vec<Field>,
+    /// Set of valid argument types.
+    /// The first dimension (0) represents specific combinations of valid argument types
+    /// The second dimension (1) represents the types of each argument.
+    /// For example, [[t1, t2]] is a function of 2 arguments that only accept t1 on the first arg and t2 on the second

Review comment:
       I believe that the current API uses `ScalarFunction`:
   
   ```
   ctx.register_udf(ScalarFunction::new(
           "custom_sqrt",
           vec![vec![DataType::Float64]],
           DataType::Float64,
           Arc::new(custom_sqrt),
       ));
   ```
   
   I do not like it, but I was trying not to change it until we land an interface to declare UDFs sufficiently generic for our own UDFs with variable types.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-687515483


   @alamb and @andygrove , after some iterations, this is my proposal for how we treat UDFs internally.
   
   This makes UDFs have the same capabilities as built-in functions, but we offer `prelude::create_udf` as a helper function that covers most use-cases of a fixed signature, fixed return type. If the user wishes complete control, `ScalarUDF::new()` covers that. We offer an example of usage of `create_udf`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-675041274


   > Currently we only do optimizations on the logical plan, but there is a plan to have physical plan optozations as well.
   
   Won't those be made against a physical plan? AFAI can tell, the coercion removed in this PR is only affecting the logical plan.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r483427788



##########
File path: rust/datafusion/src/sql/planner.rs
##########
@@ -524,25 +524,18 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> {
                             args: rex_args,
                         })
                     }
-                    // finally, built-in scalar functions
+                    // finally, user-defined functions
                     _ => match self.schema_provider.get_function_meta(&name) {
                         Some(fm) => {
-                            let rex_args = function
+                            let args = function
                                 .args
                                 .iter()
                                 .map(|a| self.sql_to_rex(a, schema))
                                 .collect::<Result<Vec<Expr>>>()?;
 
-                            let mut safe_args: Vec<Expr> = vec![];

Review comment:
       the sql planner no longer needs to add casts based on the function's signature. This was causing statements such as `SELECT my_sqrt(c1) FROM t` to be logically planned as `SELECT my_sqrt(CAST(c1 AS f64)) FROM t`, which is counter-intuitive when reading the result.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-679817828


   I am reverting this PR back to draft, as IMO we first need differentiate between UDFs and built-in functions, before we can even consider expanding the pool of available data types to UDFs. See discussions on PR #8032


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-678768666


   When you mean `data_type` you mean the arguments' types or the `return_type`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r484350796



##########
File path: rust/datafusion/examples/simple_udf.rs
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::{
+    array::{Array, ArrayRef, Float32Array, Float64Array, Float64Builder},
+    datatypes::DataType,
+    record_batch::RecordBatch,
+    util::pretty,
+};
+
+use datafusion::error::Result;
+use datafusion::{physical_plan::functions::ScalarFunctionImplementation, prelude::*};
+use std::sync::Arc;
+
+// create local execution context with an in-memory table
+fn create_context() -> Result<ExecutionContext> {
+    use arrow::datatypes::{Field, Schema};
+    use datafusion::datasource::MemTable;
+    // define a schema.
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Float32, false),
+        Field::new("b", DataType::Float64, false),
+    ]));
+
+    // define data.
+    let batch = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1])),
+            Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
+        ],
+    )?;
+
+    // declare a new context. In spark API, this corresponds to a new spark SQLsession
+    let mut ctx = ExecutionContext::new();
+
+    // declare a table in memory. In spark API, this corresponds to createDataFrame(...).
+    let provider = MemTable::new(schema, vec![vec![batch]])?;
+    ctx.register_table("t", Box::new(provider));
+    Ok(ctx)
+}
+
+/// In this example we will declare a single-type, single return type UDF that exponentiates f64, a^b
+fn main() -> Result<()> {
+    let mut ctx = create_context()?;
+
+    // First, declare the actual implementation of the calculation
+    let pow: ScalarFunctionImplementation = Arc::new(|args: &[ArrayRef]| {
+        // in DataFusion, all `args` and output are dynamically-typed arrays, which means that we need to:
+        // 1. cast the values to the type we want
+        // 2. perform the computation for every element in the array (using a loop or SIMD)
+        // 3. construct the resulting array
+
+        // this is guaranteed by DataFusion based on the function's signature.
+        assert_eq!(args.len(), 2);
+
+        // 1. cast both arguments to f64. These casts MUST be aligned with the signature or this function panics!
+        let base = &args[0]
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .expect("cast failed");
+        let exponent = &args[1]
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .expect("cast failed");
+
+        // this is guaranteed by DataFusion. We place it just to make it obvious.
+        assert_eq!(exponent.len(), base.len());
+
+        // 2. Arrow's builder is used to construct an Arrow array.
+        let mut builder = Float64Builder::new(base.len());
+        for index in 0..base.len() {
+            // in arrow, any value can be null.
+            // Here we decide to make our UDF to return null when either base or exponent is null.
+            if base.is_null(index) || exponent.is_null(index) {
+                builder.append_null()?;
+            } else {
+                // 3. computation. Since we do not have any SIMD `pow` operation at our hands,
+                // we loop over each entry. Array's values are obtained via `.value(index)`.
+                let value = base.value(index).powf(exponent.value(index));
+                builder.append_value(value)?;
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    });
+
+    // Next:
+    // * git it a name (so that it shows nicely when the plan is printed)

Review comment:
       ```suggestion
       // * give it a name so that it shows nicely when the plan is printed
       //   and `pow` can be used in expressions
   ```

##########
File path: rust/datafusion/examples/simple_udf.rs
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       This example is really nice -- and the comments throughout make it easy for me to follow

##########
File path: rust/datafusion/src/execution/dataframe_impl.rs
##########
@@ -232,6 +241,50 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn registry() -> Result<()> {
+        let mut ctx = ExecutionContext::new();
+        register_aggregate_csv(&mut ctx)?;
+
+        // declare the udf
+        let my_add: ScalarFunctionImplementation = Arc::new(|args: &[ArrayRef]| {

Review comment:
       This doesn't seem right -- the implementation uses two arguments, but the call to `create_udf` only registers a single float64 arg. Given the `my_add` function never actually gets implemented, this doesn't cause a problem in this test.
   
   I suggest changing the body of `my_add` to be `unimplemented!("my_add is not implemented")` to make it clear the code is not executed during this test.

##########
File path: rust/datafusion/src/physical_plan/functions.rs
##########
@@ -232,6 +246,80 @@ fn signature(fun: &ScalarFunction) -> Signature {
     }
 }
 
+/// Physical expression of a scalar function
+pub struct ScalarFunctionExpr {
+    fun: ScalarFunctionImplementation,
+    name: String,
+    args: Vec<Arc<dyn PhysicalExpr>>,
+    return_type: DataType,
+}
+
+impl Debug for ScalarFunctionExpr {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        f.debug_struct("ScalarFunctionExpr")
+            .field("fun", &"<FUNC>")
+            .field("name", &self.name)
+            .field("args", &self.args)
+            .field("return_type", &self.return_type)
+            .finish()
+    }
+}
+
+impl ScalarFunctionExpr {
+    /// Create a new Scalar function
+    pub fn new(
+        name: &str,
+        fun: ScalarFunctionImplementation,
+        args: Vec<Arc<dyn PhysicalExpr>>,
+        return_type: &DataType,
+    ) -> Self {
+        Self {
+            fun,
+            name: name.to_owned(),
+            args,
+            return_type: return_type.clone(),
+        }
+    }
+}
+
+impl fmt::Display for ScalarFunctionExpr {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(
+            f,
+            "{}({})",
+            self.name,
+            self.args
+                .iter()
+                .map(|e| format!("{}", e))
+                .collect::<Vec<String>>()
+                .join(", ")
+        )
+    }
+}
+
+impl PhysicalExpr for ScalarFunctionExpr {
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(self.return_type.clone())
+    }
+
+    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {

Review comment:
       Allowing the definition of a user defined function to define "nullable" or not is probably something we should do in a future PR 

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -368,32 +368,22 @@ impl DefaultPhysicalPlanner {
                     .collect::<Result<Vec<_>>>()?;
                 functions::create_physical_expr(fun, &physical_args, input_schema)
             }
-            Expr::ScalarUDF {
-                name,
-                args,
-                return_type,
-            } => match ctx_state.scalar_functions.get(name) {
-                Some(f) => {
-                    let mut physical_args = vec![];
-                    for e in args {
-                        physical_args.push(self.create_physical_expr(
-                            e,
-                            input_schema,
-                            ctx_state,
-                        )?);
-                    }
-                    Ok(Arc::new(ScalarFunctionExpr::new(
-                        name,
-                        f.fun.clone(),
-                        physical_args,
-                        return_type,
-                    )))
+            Expr::ScalarUDF { fun, args } => {
+                let mut physical_args = vec![];
+                for e in args {
+                    physical_args.push(self.create_physical_expr(
+                        e,
+                        input_schema,
+                        ctx_state,
+                    )?);
                 }
-                _ => Err(ExecutionError::General(format!(
-                    "Invalid scalar function '{:?}'",
-                    name
-                ))),
-            },
+
+                udf::create_physical_expr(

Review comment:
       And an error will occur here if the inputs can't be coerced to the form required by the inputs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-683711505


   # B 
   I see what you mean now -- I think I was getting confused about what code will do the coercion of input types. I was thinking that that built in type coercion logic could handle simple cases, but for anything more complicated (e.g. the array usecase to coerce all arguments to the same type) that the UDF implementation would handle it. I can see, however, why potentially having this logic built into DataFusion would be helpful.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-674411085


   @jorgecarleitao This looks good but I have one concern/question. What happens if I declare a function with two args and both args can be Float32 or Float64, but when I call it, one arg is i8 and the other arg is f64? Would they both get cast to f64 in this case? or would one be cast to f32 and the other to f64? The latter would not work if the function requires both types to be the same.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-682868097


   Thanks @Jorge Cardoso Leitão <jo...@gmail.com> it seems like I
   misunderstood the scope of the change. I will review your response this
   weekend to make sure I understand.
   
   > What I struggle a bit to understand is that I have raised this concern
   before and no-one has offered a solution to it.
   
   It is likely that others just haven't had the time to look yet. I know I've
   had little time this week due to work commitments.
   
   
   On Fri, Aug 28, 2020 at 10:24 AM Jorge Leitao <no...@github.com>
   wrote:
   
   > The discussion for me is not so much about the return type of the function
   > sqrt. I am fine with sqrt_f32. My issue is with the output of the
   > function array, collect_list, concat, and a significant number of other
   > functions whose return type is variable.
   >
   > I do not think that other query engines are designed to have a fixed
   > return type: collect_list has a variable return type, getitem has a
   > variable return type; array has a variable return type. Essentially,
   > every function that works on non-primitive types have a variable return
   > type, because there are common operations on structs that are irrespective
   > of the specific types that they hold.
   >
   > The functions that I see we will hit soon are functions that operate on
   > strings: arrow supports utf8 and Largeutf8. Are we also making all
   > functions that operate on strings (e.g. concat) always return Largeutf8,
   > or utf8? What about binary and LargeBinary? I feel that by forcing a
   > single return type, DataFusion drifts farther away from Arrow as we end up
   > supporting a smaller and smaller subset of the types that arrow supports on
   > its execution plans.
   >
   > For me, we should embrace the fact that most engines migrated to variable
   > return types at some point in time during their life-time and design a
   > typing/function system that caters for that from the get go, so that we
   > have all options available to us when we want to expand our function set.
   >
   > At the moment, I am confident that the approach in #8032
   > <https://github.com/apache/arrow/pull/8032> covers all relevant
   > use-cases, and is also fully aligned with how we do it for binary math and
   > logical operators.
   >
   > Of course I would respect the decision to make all built-in functions/UDF
   > of a fixed return type, and design the engine assuming that invariant, but
   > it would be difficult for me to continue to contribute to that part of the
   > code base when I am 90% certain that we will hit another design blocker
   > when we want to implement array([c1, c2]) or something. The only viable
   > option I see to add a new function such as array, without an item in Expr
   > that generically supports them, is to introduce a new entry to enum Expr.
   > Doing so is backward incompatible, which means that we will introducing a
   > backward incompatible change every time we need to add a new function of
   > variable return type.
   >
   > What I struggle a bit to understand is that I have raised this concern
   > before and no-one has offered a solution to it.
   > Specifically, if we design our UDF's API (which currently supports all our
   > built-in functions) as fixed return type, how will we support array and
   > other scalar functions of variable return type without introducing a
   > backward incompatible change on every new function?
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/arrow/pull/7967#issuecomment-682832105>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AAHEBRBKXIXSQ6XBF4QZ2YLSC7K2FANCNFSM4QAG5BEA>
   > .
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-682832105


   The discussion for me is not so much about the return type of the function `sqrt`. I am fine with `sqrt_f32`. My issue is with the output of the function `array`, `collect_list`, `concat`, and a significant number of other functions whose return type is variable.
   
   I do not think that other query engines are designed to have a fixed return type: `collect_list` has a variable return type, `getitem` has a variable return type; `array` has a variable return type. Essentially, every function that works on non-primitive types have a variable return type, because there are common operations on structs that are irrespective of the specific types that they hold.
   
   The functions that I see we will hit soon are functions that operate on strings: arrow supports utf8 and Largeutf8. Are we also making all functions that operate on strings (e.g. `concat`) always return `Largeutf8`, or `utf8`? What about binary and LargeBinary? I feel that by forcing a single return type, DataFusion drifts farther away from Arrow as we end up supporting a smaller and smaller subset of the types that arrow supports on its execution plans.
   
   For me, we should embrace the fact that most engines migrated to variable return types at some point in time during their life-time and design a typing/function system that caters for that from the get go, so that we have all options available to us when we want to expand our function set.
   
   At the moment, I am confident that the approach in #8032 covers all relevant use-cases, and is also fully aligned with how we do it for binary math and logical operators.
   
   Of course I would respect the decision to make all built-in functions/UDF of a fixed return type, and design the engine assuming that invariant, but it would be difficult for me to continue to contribute to that part of the code base when I am 90% certain that we will hit another design blocker when we want to implement `array([c1, c2])` or something. The only viable option I see to add a new function such as `array`, without an item in `Expr` that generically supports them, is to introduce a new entry to `enum Expr`. Doing so is backward incompatible, which means that we will introducing a backward incompatible change every time we need to add a new function of variable return type.
   
   What I struggle a bit to understand is that I have raised this concern before and no-one has offered a solution to it.
   Specifically, if we design our UDF's API (which currently supports all our built-in functions) as fixed return type, how will we support `array` and other scalar functions of variable return type without introducing a backward incompatible change on every new function?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-683275858


   I wonder if it would be helpful to think about types in logical and physical plans differently. 
   
   Let's take @jorgecarleitao's `concat` as an example and test the output of concatenating two columns together. I think the ideal experience is to use an expression that appears in a query / data frame, like the following. 
   
   ```
   'FooBar' = concat(A, B)
   ```
   
   And let's consider what has to happen when running the `ExecutionPlan` when  A and B are both  `Utf8` columns or are both `LargeUtf8` columns. 
   
   
   ### Execution Plan Option 1 
   Use a single function that can handle either Utf8 or LargeUtf8 inputs ( what I think this PR is proposing)
   
   Pros:
   1. The same function information can be used both in logical and physical plans, and the semantics are clear
   
   Cons
   1. The type signature (aka what input types will be supplied and what output types produced) needs to be computed both at plan time (e.g. type coercion logic so the output type is known) and at runtime (the actual UDF needs to switch on input type) and those computations need to remain consistent
   2. The implementation of the operator might be more complicated (as all the combinations of supported types must be enumerated and the actual calculation needs to support all those combinations)
   
   ### Execution Plan Option 2
   Use distinct functions for different type signatures (e.g. `concat` and `concat_large`). I *think* this is what Spark does (https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html#register-a-function-as-a-udf)
   
   Pros;
   1. Each implementation is simpler and unambiguous as to its type
   2. The type coercion logic is also simpler as there is only a single type signature that could match
   
   Cons:
   1. More verbose user experience as now users have to specify the exact function they want (e.g. `concat` or `concat_large`). I think this is what @jorgecarleitao  was describing with "functions that return different types"
   
   ### Execution Plan Option 3 (proposal)
   Have a single expression that can take multiple type signatures, and provide multiple UDFs that each have a single type signature. 
   
   So that would mean the expression:
   ```
   'FooBar' = concat(A, B)
   ```
   
   If A and B were Utf8, the physical expression would be 
   
   ```
   'FooBar' = concat_utf8(A, B)
   ```
   
   And if A and B were LongUtf8, then the physical expression would be
   ```
   'FooBar' = concat_long_utf8(A, B)
   ```
   
   And perhaps the type coercion logic would be responsible for picking which specific function to use
   
   Pros:
   1. Allows for reasonable user experience (can use `concat`)
   2. The types are all well known at execution plan type. 
   
   Cons:
   1. Would be a larger change to DataFusion
   2. May not align with what other systems do. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r471097019



##########
File path: rust/datafusion/src/execution/physical_plan/udf.rs
##########
@@ -37,8 +37,11 @@ pub type ScalarUdf = Arc<dyn Fn(&[ArrayRef]) -> Result<ArrayRef> + Send + Sync>;
 pub struct ScalarFunction {
     /// Function name
     pub name: String,
-    /// Function argument meta-data
-    pub args: Vec<Field>,
+    /// Set of valid argument types.
+    /// The first dimension (0) represents specific combinations of valid argument types
+    /// The second dimension (1) represents the types of each argument.
+    /// For example, [[t1, t2]] is a function of 2 arguments that only accept t1 on the first arg and t2 on the second
+    pub args: Vec<Vec<DataType>>,

Review comment:
       ```suggestion
       pub arg_types: Vec<Vec<DataType>>,
   ```

##########
File path: rust/datafusion/src/execution/physical_plan/udf.rs
##########
@@ -37,8 +37,11 @@ pub type ScalarUdf = Arc<dyn Fn(&[ArrayRef]) -> Result<ArrayRef> + Send + Sync>;
 pub struct ScalarFunction {
     /// Function name
     pub name: String,
-    /// Function argument meta-data
-    pub args: Vec<Field>,
+    /// Set of valid argument types.
+    /// The first dimension (0) represents specific combinations of valid argument types
+    /// The second dimension (1) represents the types of each argument.
+    /// For example, [[t1, t2]] is a function of 2 arguments that only accept t1 on the first arg and t2 on the second

Review comment:
       I suggest you move this comment to `FunctionMeta` @ https://github.com/apache/arrow/pull/7967/files#diff-be9adb93b0effc41a9672892e1aed3e1R50 (which is more likely where someone writing a UDF would be interacting) and refer to that comment here

##########
File path: rust/datafusion/src/logicalplan.rs
##########
@@ -75,7 +75,7 @@ impl FunctionMeta {
         &self.name
     }
     /// Getter for the arg list
-    pub fn args(&self) -> &Vec<Field> {
+    pub fn args(&self) -> &Vec<Vec<DataType>> {

Review comment:
       ```suggestion
       pub fn arg_types(&self) -> &Vec<Vec<DataType>> {
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them

Review comment:
       ```suggestion
       // for each set of valid signatures, try to coerce all expressions to one of them
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;

Review comment:
       ```suggestion
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it
+        if let Some(types) = maybe_coerse(valid_types, &current_types) {
+            // yes: let's re-write the expressions
+            new_expressions = Some(
+                expressions
+                    .iter()
+                    .enumerate()
+                    .map(|(i, expr)| expr.cast_to(&types[i], schema))
+                    .collect::<Result<Vec<_>>>()?,
+            );
+            break;
+        }
+        // we cannot: try the next
+    }
+    Ok(new_expressions)
+}
+
+/// Try to coerse current_types into valid_types
+fn maybe_coerse(
+    valid_types: &Vec<DataType>,
+    current_types: &Vec<DataType>,
+) -> Option<Vec<DataType>> {
+    let mut super_type = Vec::with_capacity(valid_types.len());
+    for (i, valid_type) in valid_types.iter().enumerate() {
+        let current_type = &current_types[i];
+        if let Ok(t) = utils::get_supertype(current_type, valid_type) {
+            super_type.push(t)
+        } else {
+            return None;
+        }
+    }
+    Some(super_type)
+}
+
 #[cfg(test)]

Review comment:
       I think we should have some unit tests of `maybe_coerse ` and `maybe_rewrite` -- I reviewed the logic and it looks good to me, but tests would help against regressions (someone messing up the code in the future without realizing it b/c tests kept passing) as well as serving as another form of documentation (enumerating expected inputs and outputs)

##########
File path: rust/datafusion/src/logicalplan.rs
##########
@@ -47,8 +47,8 @@ pub enum FunctionType {
 pub struct FunctionMeta {
     /// Function name
     name: String,
-    /// Function arguments
-    args: Vec<Field>,
+    /// Function arguments. Each argument i can be one of the types of args[i], with respective priority
+    args: Vec<Vec<DataType>>,

Review comment:
       ```suggestion
       arg_types: Vec<Vec<DataType>>,
   ```

##########
File path: rust/datafusion/src/execution/physical_plan/udf.rs
##########
@@ -60,7 +63,7 @@ impl ScalarFunction {
     /// Create a new ScalarFunction
     pub fn new(
         name: &str,
-        args: Vec<Field>,
+        args: Vec<Vec<DataType>>,

Review comment:
       ```suggestion
           arg_types: Vec<Vec<DataType>>,
   ```

##########
File path: rust/datafusion/src/logicalplan.rs
##########
@@ -59,7 +59,7 @@ impl FunctionMeta {
     #[allow(missing_docs)]
     pub fn new(
         name: String,
-        args: Vec<Field>,
+        args: Vec<Vec<DataType>>,

Review comment:
       ```suggestion
           arg_types: Vec<Vec<DataType>>,
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it

Review comment:
       ```suggestion
           // for each option, try to coerce all arguments to it
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it
+        if let Some(types) = maybe_coerse(valid_types, &current_types) {
+            // yes: let's re-write the expressions
+            new_expressions = Some(
+                expressions
+                    .iter()
+                    .enumerate()
+                    .map(|(i, expr)| expr.cast_to(&types[i], schema))
+                    .collect::<Result<Vec<_>>>()?,
+            );
+            break;
+        }
+        // we cannot: try the next
+    }
+    Ok(new_expressions)
+}
+
+/// Try to coerse current_types into valid_types
+fn maybe_coerse(

Review comment:
       ```suggestion
   fn maybe_coerce(
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -84,29 +84,46 @@ impl<'a> TypeCoercionRule<'a> {
                 args,
                 return_type,
             } => {
-                // cast the inputs of scalar functions to the appropriate type where possible
+                // cast the inputs of scalar functions to the appropriate type
                 match self.scalar_functions.get(name) {
                     Some(func_meta) => {
-                        let mut func_args = Vec::with_capacity(args.len());
-                        for i in 0..args.len() {
-                            let field = &func_meta.args[i];
-                            let expr = self.rewrite_expr(&args[i], schema)?;
-                            let actual_type = expr.get_type(schema)?;
-                            let required_type = field.data_type();
-                            if &actual_type == required_type {
-                                func_args.push(expr)
-                            } else {
-                                let super_type =
-                                    utils::get_supertype(&actual_type, required_type)?;
-                                func_args.push(expr.cast_to(&super_type, schema)?);
-                            }
-                        }
+                        // compute the current types and expressions
+                        let expressions = args
+                            .iter()
+                            .map(|e| self.rewrite_expr(e, schema))
+                            .collect::<Result<Vec<_>>>()?;
+
+                        // compute the current types and expressions

Review comment:
       ```suggestion
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it
+        if let Some(types) = maybe_coerse(valid_types, &current_types) {

Review comment:
       ```suggestion
           if let Some(types) = maybe_coerce(valid_types, &current_types) {
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it
+        if let Some(types) = maybe_coerse(valid_types, &current_types) {
+            // yes: let's re-write the expressions
+            new_expressions = Some(
+                expressions
+                    .iter()
+                    .enumerate()
+                    .map(|(i, expr)| expr.cast_to(&types[i], schema))
+                    .collect::<Result<Vec<_>>>()?,
+            );
+            break;
+        }
+        // we cannot: try the next
+    }
+    Ok(new_expressions)

Review comment:
       ```suggestion
       Ok(None)
   ```
   
   (this is stylistic suggestion -- you can avoid a `mut` variable and just return on the branch when the type coercion worked) 

##########
File path: rust/datafusion/src/sql/planner.rs
##########
@@ -523,10 +523,14 @@ impl<S: SchemaProvider> SqlToRel<S> {
 
                             let mut safe_args: Vec<Expr> = vec![];
                             for i in 0..rex_args.len() {
-                                safe_args.push(
-                                    rex_args[i]
-                                        .cast_to(fm.args()[i].data_type(), schema)?,
-                                );
+                                let expr = if fm.args()[i]
+                                    .contains(&rex_args[i].get_type(schema)?)
+                                {
+                                    rex_args[i].clone()
+                                } else {
+                                    rex_args[i].cast_to(&fm.args()[i][0], schema)?

Review comment:
       Adding a test for a user defined function that takes no args would probably be good -- an example of such a function might be `rand()` 

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it
+        if let Some(types) = maybe_coerse(valid_types, &current_types) {
+            // yes: let's re-write the expressions
+            new_expressions = Some(
+                expressions
+                    .iter()
+                    .enumerate()
+                    .map(|(i, expr)| expr.cast_to(&types[i], schema))
+                    .collect::<Result<Vec<_>>>()?,
+            );
+            break;

Review comment:
       ```suggestion
   ```

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -198,6 +215,50 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     }
 }
 
+/// tries to re-cast expressions under schema based on the set of valid signatures
+fn maybe_rewrite(
+    expressions: &Vec<Expr>,
+    current_types: &Vec<DataType>,
+    schema: &Schema,
+    signature: &Vec<Vec<DataType>>,
+) -> Result<Option<Vec<Expr>>> {
+    // for each set of valid signatures, try to coerse all expressions to one of them
+    let mut new_expressions: Option<Vec<Expr>> = None;
+    for valid_types in signature {
+        // for each option, try to coerse all arguments to it
+        if let Some(types) = maybe_coerse(valid_types, &current_types) {
+            // yes: let's re-write the expressions
+            new_expressions = Some(

Review comment:
       ```suggestion
               return Some(
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-675035668


   Currently we only do optimizations on the logical plan, but there is a plan
   to have physical plan optozations as well.
   
   On Mon, Aug 17, 2020, 11:37 AM Jorge Leitao <no...@github.com>
   wrote:
   
   > *@jorgecarleitao* commented on this pull request.
   > ------------------------------
   >
   > In rust/datafusion/src/sql/planner.rs
   > <https://github.com/apache/arrow/pull/7967#discussion_r471652148>:
   >
   > > @@ -515,27 +515,29 @@ impl<S: SchemaProvider> SqlToRel<S> {
   >                      }
   >                      _ => match self.schema_provider.get_function_meta(&name) {
   >                          Some(fm) => {
   > -                            let rex_args = function
   >
   > AFAI recall, optimizations happen on the logical plan alone. Also, I think
   > that 52218c8
   > <https://github.com/apache/arrow/commit/52218c852b7b3016afeaf95d8a46d6deea89d231>
   > removes the coercion in constructing the logical, not physical, plan (SQL
   > nodes -> physical).
   >
   > If this is the case, logical -> physical does not have to worry about
   > these, right?
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/arrow/pull/7967#discussion_r471652148>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AAHEBRG3AGK3SB5ZALBMEUTSBFTEDANCNFSM4QAG5BEA>
   > .
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao edited a comment on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-678825604


   I pushed a new commit for this. Essentially, the new commit moves the type coercion of UDFs to the physical plan, thus aligning this code base to the current master after #8024 , that no longer performs type coercion on the logical plane (as it violates schema invariance).
   
   Consequently, the type coercer optimizer became useless and was therefore removed. :)
   
   So, UDFs are now treated like other physical operations in that we offer numerical coercion of their arguments at the physical level, so that users do not have to match their type exactly, nor worry about changing schemas due to re-casting.
   
   This is ready to re-review, @alamb and @andygrove . Thank you for your patience!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-683318690


   Thanks @alamb for writing this up. I think that it is a good summary.
   
   I generally agree with option 3 and 1, as both offer the same user experience, and IMO just differ on our own internal implementation. What I like about 1 and 3 is that the user's intent is logically clear from `concat([c1, c2])`: concatenate the two columns (and I do not care if they are large or not; just do your best). If a user wants a specific type, they can cast them explicitly.
   
   With this said, as an exercise, let me try to write how I imagine an interface could look like for option 3, just to check if I have the same understanding as you do.
   
   First, the actual functions:
   
   ```
   pub fn concat(args: &[StringArray]) -> Result<StringArray>;
   pub fn concat_large(args: &[LargeStringArray]) -> Result<LargeStringArray>;
   ```
   
   Both implementations would be similar code-wise, and thus we would probably write a macro, and use it in each function, for DRY purposes.
   
   #### Physical evaluation
   
   Next, we need to physically evaluate them. This is a bit less clear to me in option 3. Is it the idea that we have one physical node per variation, whose return type is not `ArrayRef`?
   
   Or is the idea that the physical node would be the same for both?
   
   The closest to current DataFusion's design is something like
   
   ```
   impl PhysicalExpr for Concatenate {
       fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
           // evaluate the arguments
           let inputs = self
               .args
               .iter()
               .map(|e| e.evaluate(batch))
               .collect::<Result<Vec<_>>>()?;
       
           // evaluate the function
           match inputs.data_type() {
               DataType::Utf8 => {
                   // let inputs = downcast every arg in inputs to StringArray
                   concat(inputs)
               }
               DataType::LargeUtf8 => {
                   // let inputs = downcast every arg in inputs to LargeStringArray
                   concat_large(inputs)
               }
           }
       }
   }
   ```
   
   I will run with this, but I can see that this may not be what you mean with option 3 (could you please clarify this?).
   
   ### Physical planning
   
   During physical planning, e.g. `Concatenate::new(exprs)`, we need to check that this wont cause execution errors at compute time due to type mismatches. This is because both input (`RecordBatch`) and return (`ArrayRef`) are dynamically typed.
   
   So, we need to specify what types `RecordBatch` can contain (utf8 or largeutf8 in this case). To check that the return type aligns with the next node's valid types, we also need to know the return type of `Concatenate` based on its argument types. So, something like
   
   ```
   fn concatenate_valid_types() -> Vec<DataType>;
   fn concatenate_return_type(arg_types: Vec<DataType>) -> Result<DataType>;
   ```
   
   and these need to be aligned (code-wise) with `Concatenate::evaluate`.
   
   #### Logical planning
   
   Finally, logical planning. We could use `Expr::Concatenate`, or maybe `Expr::ScalarFunction` (for all built-ins). Since `Concatenate`'s return type differs based on its input, we have the same problem as before: we need to declare the return type of `Expr::Concatenate`, which we can only know from its input. So, again, we would need two functions like the ones above (or re-use them) to test that the type is a valid type, and to return the valid type.
   
   Again, I am trying to run through the option and see how it would look like. Is this what you are thinking, or is it something different?
   
   Regardless of whether it is or not, the critical moment for me in all of this is where do we perform the `downcast`, because that is the moment we go from dynamic to static, and my understanding is that the main difference between 1 and 3 is where that downcast happens.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r475266854



##########
File path: rust/datafusion/src/execution/physical_plan/math_expressions.rs
##########
@@ -103,56 +103,3 @@ pub fn scalar_functions() -> Vec<ScalarFunction> {
         math_unary_function!("log10", log10),
     ]
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::error::Result;
-    use crate::{
-        execution::context::ExecutionContext,
-        logicalplan::{col, sqrt, LogicalPlanBuilder},
-    };
-    use arrow::datatypes::{Field, Schema};
-
-    #[test]
-    fn cast_i8_input() -> Result<()> {

Review comment:
       These tests were moved to another place




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-683714908


   > Final note: this whole discussion may be a bit too detailed for udfs, and we could instead offer a simpler interface to not handle these complex cases. However, this whole discussion applies in almost the same way to built-in functions: the only difference is whether they are of static lifetime or part of a registry. That is why I am so interested in all the "hard" cases.
   
   Another option that might be worth considering is "only allow simple types for user defined functions -- e.g. a single set of input types and a return type" 
   
   This would mean we could only implement functions like `array` and `concat` (as you have described it) as built in functions, but it would definitely simplify the code and UDF interface definition.
   
   I am not sure I like treating internal functions and external UDFs differently, but it might be worth considering. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-688881299


   @jorgecarleitao I think this PR is next in line to get merged. Please rebase when you get the chance.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-678739758


   This is currently failing for an interesting reason, and I need some help in decision making.
   
   TL;DR options:
   
   1. wait for #8024 and stop using the type coercer, since it breaks schema invariance under optimization
   2. short-term copy-paste some of the type coercion on the optimizer to the SQL planner
   
   I am more inclined to wait for the typing to be debt free before merging this, but I am willing to hack something up here to make have this in, if there is any sense of urgency on it.
   
   -------
   
   To recap, the main goal of this PR is to allow scalar UDFs to accept multiple input types during planning (still returning the same output type). This generally speeds up calculations as smaller types are faster than larger types. E.g. numerically, `sqrt(float32) -> float64` is faster than `sqrt(CAST(float32 AS float64)) -> float64`.
   
   Due to how the `get_supertype` is implemented and how the sql planner uses it, the statement 
   
   ```SELECT sqrt(c11) FROM ...``` 
   
   is (in master) converted to a logical plan already with a coercion: the column is named `"sqrt(CAST(c11 AS float64))"` when `c11` is not a float64.
   
   This PR removed the coercion from the sql planner as I was trying to not have to copy-paste code the new logic from the type coercer into the SQL planner. However, because our type coercer currently breaks schema invariance under optimization, this now fails the test of that invariant: the plan's schema has a field named `"sqrt(c11)"`, but the type coercer changes it to `"sqrt(CAST(c11 AS float64))"`.
   
   Note that this is independent of the changes that this PR proposes - even with a single allowed datatype, we still need to perform numerical coercion. What changed here was that the type coercion rules for multiple data types are a bit more complex, and I was trying to not have to copy-paste code from the type coercer into the SQL planner (in other words, there is some coupling between the two via the `get_supertype`). IMO the root cause is the type coercer optimizer, that breaks schema invariance because it operates on the logical plane.
   
   One option is to stop using the type coercer optimizer at the logical level, and instead apply cast operations at the physical level, as #8024 moves towards that.
   
   Another option is to copy-paste [the type coercion that this PR proposes on type coercer] to the SQL planner, thus guaranteeing that the coercion is consistent between the planner and the type coercer.
   
   @andygrove and @alamb , what do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao edited a comment on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-674555985


   > I reviewed the logic carefully in this PR and I think overall it is quite good. Nice work @jorgecarleitao. The only thing I personally think is needed before it would be ready to merge are the following tests:
   > 
   >     1. End-to-end test (in sql.rs) using a UDF with multiple possible type signatures
   > 
   >     2. Unit tests for the the coercion logic
   > 
   > 
   > All the rest of the stuff in this PR is suggestions.
   > 
   > Along with the test, it might be cool to write up some docs / comments that shows how to write a UDF that actually accepts multiuple types)-- the only [test I see now](https://github.com/apache/arrow/pull/7967/files#diff-8273e76b6910baa123f3a25a967af3b5L1237) has a single set of argument types.
   > 
   > In this design, the UDFs are effectively "polymorphic" in the sense that they can accept multiple different argument types and will have to dispatch at runtime.
   > 
   > Another potential design for UDFs is to provide each UDF with an alias that could be duplicated and a single argument type (e.g `sqrt --> sqrt_32(f32)` and `sqrt --> sqrt_64(f64)`). Then an optimizer / coercion pass would have the logic to resolve the `sqrt` alias to `sqrt_32` or `sqrt_64` depending on the input argument types.
   > 
   > This approach might be marginally faster as the input types would be resolved once at planning time rather than during runtime. However, given that datafusion has a vectorized executor (e.g. the types would be handled once per RecordBatch) the overhead of runtime dispatch will likely not be noticable.
   
   Thank you again @alamb for taking the time to review this. I agree with all you said.
   
   My other two PRs effectively add support for polymorphic functions (including return type). The reason being that we are already doing that for our own udfs, with the function `data_type()/get_type()`, both at the physical and logical level. This is intrinsic to our execution model that requires downcasting and builders inside the function. Since we require the developer to go through that pain, we may as-well just offer them the full flexibility of that. I agree that there is a small overhead, but IMO small compared to execution, and we can always use some form of cached_attribute if we start seeing performance issues in the planning phase.
   
   As an example of the impressive flexibility we get, I was able to run Python lambdas inside a data-fusion UDF in a [pet project of mine](https://github.com/jorgecarleitao/datafusion-python), and convert the result back to numpy arrays, which IMO is mind blowing.
   
   Changes from last time:
   
   * incorporated almost all your suggestions
   * added end-to-end tests
   * added tests to both internal functions of type coercion
   
   I think that the logic of `get_supertype` is not entirely correct atm (e.g. utf8 can be converted to all types), but we have an issue tracking that.
   
   Currently I do struggle to know exactly where to place documentation for this. IMO this is not something to place on the API documentation, but rather on a user-guide. I think that we need to take some time to create placeholders in our `docs/` for these. Pinging @andygrove for ideias also.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-687894822


   fyi @jorgecarleitao  I plan to look carefully at this PR again early tomorrow morning US East Coast (UTC-4) time


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-683713809


   # E 
   As I think you are suggesting, I think it would be possible to keep the ability to declare simple UDF functions inline even if we allowed a more general trait based general approach as well


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove edited a comment on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
andygrove edited a comment on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-682868097


   Thanks @Jorge it seems like I
   misunderstood the scope of the change. I will review your response this
   weekend to make sure I understand.
   
   > What I struggle a bit to understand is that I have raised this concern
   before and no-one has offered a solution to it.
   
   It is likely that others just haven't had the time to look yet. I know I've
   had little time this week due to work commitments.
   
   
   On Fri, Aug 28, 2020 at 10:24 AM Jorge Leitao <no...@github.com>
   wrote:
   
   > The discussion for me is not so much about the return type of the function
   > sqrt. I am fine with sqrt_f32. My issue is with the output of the
   > function array, collect_list, concat, and a significant number of other
   > functions whose return type is variable.
   >
   > I do not think that other query engines are designed to have a fixed
   > return type: collect_list has a variable return type, getitem has a
   > variable return type; array has a variable return type. Essentially,
   > every function that works on non-primitive types have a variable return
   > type, because there are common operations on structs that are irrespective
   > of the specific types that they hold.
   >
   > The functions that I see we will hit soon are functions that operate on
   > strings: arrow supports utf8 and Largeutf8. Are we also making all
   > functions that operate on strings (e.g. concat) always return Largeutf8,
   > or utf8? What about binary and LargeBinary? I feel that by forcing a
   > single return type, DataFusion drifts farther away from Arrow as we end up
   > supporting a smaller and smaller subset of the types that arrow supports on
   > its execution plans.
   >
   > For me, we should embrace the fact that most engines migrated to variable
   > return types at some point in time during their life-time and design a
   > typing/function system that caters for that from the get go, so that we
   > have all options available to us when we want to expand our function set.
   >
   > At the moment, I am confident that the approach in #8032
   > <https://github.com/apache/arrow/pull/8032> covers all relevant
   > use-cases, and is also fully aligned with how we do it for binary math and
   > logical operators.
   >
   > Of course I would respect the decision to make all built-in functions/UDF
   > of a fixed return type, and design the engine assuming that invariant, but
   > it would be difficult for me to continue to contribute to that part of the
   > code base when I am 90% certain that we will hit another design blocker
   > when we want to implement array([c1, c2]) or something. The only viable
   > option I see to add a new function such as array, without an item in Expr
   > that generically supports them, is to introduce a new entry to enum Expr.
   > Doing so is backward incompatible, which means that we will introducing a
   > backward incompatible change every time we need to add a new function of
   > variable return type.
   >
   > What I struggle a bit to understand is that I have raised this concern
   > before and no-one has offered a solution to it.
   > Specifically, if we design our UDF's API (which currently supports all our
   > built-in functions) as fixed return type, how will we support array and
   > other scalar functions of variable return type without introducing a
   > backward incompatible change on every new function?
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/arrow/pull/7967#issuecomment-682832105>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AAHEBRBKXIXSQ6XBF4QZ2YLSC7K2FANCNFSM4QAG5BEA>
   > .
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-689455228


   🎉 Hooray!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #7967:
URL: https://github.com/apache/arrow/pull/7967


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r470999040



##########
File path: rust/datafusion/src/execution/physical_plan/math_expressions.rs
##########
@@ -92,7 +122,7 @@ mod tests {
             .build()?;
         let ctx = ExecutionContext::new();
         let plan = ctx.optimize(&plan)?;
-        let expected = "Projection: sqrt(CAST(#c0 AS Float64))\
+        let expected = "Projection: sqrt(CAST(#c0 AS Float32))\

Review comment:
       notice here how i8 is now mapped to float32, not float64 as we give preference to smaller types.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r471688534



##########
File path: rust/datafusion/tests/sql.rs
##########
@@ -232,6 +326,55 @@ fn custom_sqrt(args: &[ArrayRef]) -> Result<ArrayRef> {
     Ok(Arc::new(builder.finish()))
 }
 
+fn custom_add(args: &[ArrayRef]) -> Result<ArrayRef> {
+    match (args[0].data_type(), args[1].data_type()) {
+        (DataType::Float64, DataType::Float64) => {
+            let input1 = &args[0]
+                .as_any()
+                .downcast_ref::<Float64Array>()
+                .expect("cast failed");
+            let input2 = &args[1]
+                .as_any()
+                .downcast_ref::<Float64Array>()
+                .expect("cast failed");
+
+            let mut builder = Float64Builder::new(input1.len());
+            for i in 0..input1.len() {
+                if input1.is_null(i) || input2.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(input1.value(i) + input2.value(i))?;
+                }
+            }
+            Ok(Arc::new(builder.finish()))
+        }
+        (DataType::Float32, DataType::Float32) => {
+            // all other cases return a constant vector (just to be diferent)
+            let mut builder = Float64Builder::new(args[0].len());
+            for _ in 0..args[0].len() {
+                builder.append_value(3232.0)?;
+            }
+            Ok(Arc::new(builder.finish()))
+        }
+        (DataType::Float32, DataType::Float64) => {
+            // all other cases return a constant vector (just to be diferent)
+            let mut builder = Float64Builder::new(args[0].len());
+            for _ in 0..args[0].len() {
+                builder.append_value(3264.0)?;
+            }
+            Ok(Arc::new(builder.finish()))
+        }
+        (_, _) => {
+            // all other cases return a constant vector (just to be diferent)

Review comment:
       Good point. I made the function return an Err instead, and asserted that the result of the execution is an error. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r471146144



##########
File path: rust/datafusion/src/execution/physical_plan/math_expressions.rs
##########
@@ -20,36 +20,62 @@
 use crate::error::ExecutionError;
 use crate::execution::physical_plan::udf::ScalarFunction;
 
-use arrow::array::{Array, ArrayRef, Float64Array, Float64Builder};
-use arrow::datatypes::{DataType, Field};
+use arrow::array::{Array, ArrayRef};
+use arrow::array::{Float32Array, Float64Array};
+use arrow::datatypes::DataType;
 
 use std::sync::Arc;
 
+macro_rules! compute_op {
+    ($ARRAY:expr, $FUNC:ident, $TYPE:ident) => {{
+        let mut builder = <$TYPE>::builder($ARRAY.len());
+        for i in 0..$ARRAY.len() {
+            if $ARRAY.is_null(i) {
+                builder.append_null()?;
+            } else {
+                builder.append_value($ARRAY.value(i).$FUNC())?;
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! downcast_compute_op {
+    ($ARRAY:expr, $NAME:expr, $FUNC:ident, $TYPE:ident) => {{
+        let n = $ARRAY.as_any().downcast_ref::<$TYPE>();
+        match n {
+            Some(array) => compute_op!(array, $FUNC, $TYPE),
+            _ => Err(ExecutionError::General(format!(
+                "Invalid data type for {}",
+                $NAME
+            ))),
+        }
+    }};
+}
+
+macro_rules! unary_primitive_array_op {
+    ($ARRAY:expr, $NAME:expr, $FUNC:ident) => {{
+        match ($ARRAY).data_type() {
+            DataType::Float32 => downcast_compute_op!($ARRAY, $NAME, $FUNC, Float32Array),
+            DataType::Float64 => downcast_compute_op!($ARRAY, $NAME, $FUNC, Float64Array),
+            other => Err(ExecutionError::General(format!(
+                "Unsupported data type {:?} for function {}",
+                other, $NAME,
+            ))),
+        }
+    }};
+}
+
 macro_rules! math_unary_function {
     ($NAME:expr, $FUNC:ident) => {
         ScalarFunction::new(
             $NAME,
-            vec![Field::new("n", DataType::Float64, true)],
+            // order: from faster to slower
+            vec![vec![DataType::Float32], vec![DataType::Float64]],
             DataType::Float64,

Review comment:
       Should there be a relationship between argument type and return type? should `sqrt(f32)` return `f32`, and `sqrt(f64)` return `f64` for example?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r475266986



##########
File path: rust/datafusion/src/execution/physical_plan/planner.rs
##########
@@ -306,32 +305,25 @@ impl DefaultPhysicalPlanner {
                 input_schema,
                 data_type.clone(),
             ),
-            Expr::ScalarFunction {
-                name,
-                args,
-                return_type,
-            } => match ctx_state.scalar_functions.get(name) {
-                Some(f) => {
-                    let mut physical_args = vec![];
-                    for e in args {
-                        physical_args.push(self.create_physical_expr(
-                            e,
-                            input_schema,
-                            ctx_state.clone(),
-                        )?);
+            Expr::ScalarFunction { name, args, .. } => {
+                match ctx_state.scalar_functions.get(name) {
+                    Some(f) => {
+                        let mut physical_args = vec![];
+                        for e in args {
+                            physical_args.push(self.create_physical_expr(
+                                e,
+                                input_schema,
+                                ctx_state.clone(),
+                            )?);
+                        }
+                        function(f, physical_args, input_schema)

Review comment:
       Note how `function` is called here. This is responsible for coercing udf's arguments to valid signatures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r483418021



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -987,22 +1009,23 @@ mod tests {
             Ok(Arc::new(add(l, r)?))
         });
 
-        let my_add = ScalarFunction::new(

Review comment:
       This is the main change to end users:
   
   For most use-cases, `create_udf` is enough as it covers the common use-case of "single signature", "single return type".
   
   For all other use-cases, the user can use `ScalarFunction::new`.

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -987,22 +1009,23 @@ mod tests {
             Ok(Arc::new(add(l, r)?))
         });
 
-        let my_add = ScalarFunction::new(

Review comment:
       This is the main change to end users:
   
   For most use-cases, `create_udf -> ScalarFunction` is enough as it covers the common use-case of "single signature", "single return type".
   
   For all other use-cases, the user can use `ScalarFunction::new`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r471652148



##########
File path: rust/datafusion/src/sql/planner.rs
##########
@@ -515,27 +515,29 @@ impl<S: SchemaProvider> SqlToRel<S> {
                     }
                     _ => match self.schema_provider.get_function_meta(&name) {
                         Some(fm) => {
-                            let rex_args = function

Review comment:
       AFAI recall, optimizations happen on the logical plan alone. Also, I think that 52218c8 removes the coercion in constructing the logical, not physical, plan (SQL nodes -> physical).
   
   If this is the case, `logical -> physical` does not have to worry about these, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#discussion_r471134155



##########
File path: rust/datafusion/src/sql/planner.rs
##########
@@ -523,10 +523,14 @@ impl<S: SchemaProvider> SqlToRel<S> {
 
                             let mut safe_args: Vec<Expr> = vec![];
                             for i in 0..rex_args.len() {
-                                safe_args.push(
-                                    rex_args[i]
-                                        .cast_to(fm.args()[i].data_type(), schema)?,
-                                );
+                                let expr = if fm.args()[i]
+                                    .contains(&rex_args[i].get_type(schema)?)
+                                {
+                                    rex_args[i].clone()
+                                } else {
+                                    rex_args[i].cast_to(&fm.args()[i][0], schema)?

Review comment:
       I am not sure we can easily declare a zero-argument UDF: we do not know how many rows we should build because we have no `ArrayRef` to count them. IMO this is something that we will have to re-visit.
   
   I will add a test for 2 arguments of various types.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-678825604


   I pushed a new commit for this. Essentially, the new commit moves the type coercion of UDFs to the physical plan, thus aligning this code base to the current master after #8024 , that no longer performs type coercion on the logical plane (as it violates schema invariance).
   
   Consequently, the type coercer optimizer became useless and was therefore removed. :)
   
   So, UDFs are now treated like other physical operations in that we offer numerical coercion of their arguments at the physical level, so that users do not have to match their type exactly, and without changing the resulting schema.
   
   This is ready to re-review, @alamb and @andygrove . Thank you for your patience!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-674555985


   > I reviewed the logic carefully in this PR and I think overall it is quite good. Nice work @jorgecarleitao. The only thing I personally think is needed before it would be ready to merge are the following tests:
   > 
   >     1. End-to-end test (in sql.rs) using a UDF with multiple possible type signatures
   > 
   >     2. Unit tests for the the coercion logic
   > 
   > 
   > All the rest of the stuff in this PR is suggestions.
   > 
   > Along with the test, it might be cool to write up some docs / comments that shows how to write a UDF that actually accepts multiuple types)-- the only [test I see now](https://github.com/apache/arrow/pull/7967/files#diff-8273e76b6910baa123f3a25a967af3b5L1237) has a single set of argument types.
   > 
   > In this design, the UDFs are effectively "polymorphic" in the sense that they can accept multiple different argument types and will have to dispatch at runtime.
   > 
   > Another potential design for UDFs is to provide each UDF with an alias that could be duplicated and a single argument type (e.g `sqrt --> sqrt_32(f32)` and `sqrt --> sqrt_64(f64)`). Then an optimizer / coercion pass would have the logic to resolve the `sqrt` alias to `sqrt_32` or `sqrt_64` depending on the input argument types.
   > 
   > This approach might be marginally faster as the input types would be resolved once at planning time rather than during runtime. However, given that datafusion has a vectorized executor (e.g. the types would be handled once per RecordBatch) the overhead of runtime dispatch will likely not be noticable.
   
   Thank you again @alamb for taking the time to review this. I agree with all you said.
   
   My other two PRs effectively add support for polymorphic functions (including return type). The reason being that we are already doing that for our own udfs, with the function `data_type()/get_type()`, both at the physical and logical level. This is intrinsic to our execution model that requires downcasting and builders inside the function. Since we require the developer to go through that pain, we may as-well just offer them the full flexibility of that. I agree that there is a small overhead, but IMO small compared to execution, and we can always use some form of cached_attribute if we start seeing performance issues in the planning phase.
   
   As an example of the impressive flexibility we get, I was able to run Python lambdas inside a data-fusion UDF in a [pet project of mine](https://github.com/jorgecarleitao/datafusion-python), and convert the result back to numpy arrays, which IMO is mind blowing.
   
   Changes from last time:
   
   * incorporated almost all your suggestions
   * added end-to-end tests
   * added tests to both internal functions of type coercion
   
   I think that the logic of `get_supertype` is not entirely correct atm (e.g. utf8 can be converted to all types), but we have an issue tracking that.
   
   Currently I do struggle to know exactly where to place documentation for this. IMO this is not something to place on the API documentation, but rather on a user-guide. I have been using UDFs in a [pet project of mine](https://github.com/jorgecarleitao/datafusion-python), but I think that we need to take some time to create placeholders in our `docs/` for these. Pinging @andygrove for ideias also.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao edited a comment on pull request #7967: ARROW-9751: [Rust] [DataFusion] Allow UDFs to accept multiple data types per argument

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on pull request #7967:
URL: https://github.com/apache/arrow/pull/7967#issuecomment-683318690


   Thanks @alamb for writing this up. I think that it is a good summary.
   
   I generally agree with option 3 and 1, as both offer the same user experience, and IMO just differ on our own internal implementation. What I like about 1 and 3 is that the user's intent is logically clear from `concat([c1, c2])`: concatenate the two columns (and I do not care if they are large or not; just do your best). If a user wants a specific type, they can cast them explicitly.
   
   With this said, as an exercise, let me try to write how I imagine an interface could look like for option 3, just to check if I have the same understanding as you do.
   
   First, the actual functions:
   
   ```
   pub fn concat(args: &[StringArray]) -> Result<StringArray>;
   pub fn concat_large(args: &[LargeStringArray]) -> Result<LargeStringArray>;
   ```
   
   Both implementations would be similar code-wise, and thus we would probably write a macro, and use it in each function, for DRY purposes.
   
   #### Physical evaluation
   
   Next, we need to physically evaluate them. This is a bit less clear to me in option 3. Is it the idea that we have one physical node per variation, whose return type is not `ArrayRef`?
   
   Or is the idea that the physical node would be the same for both?
   
   The closest to current DataFusion's design is something like
   
   ```
   impl PhysicalExpr for Concatenate {
       fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
           // evaluate the arguments
           let inputs = self
               .args
               .iter()
               .map(|e| e.evaluate(batch))
               .collect::<Result<Vec<_>>>()?;
       
           // evaluate the function
           match inputs.data_type() {
               DataType::Utf8 => {
                   // let inputs = downcast every arg in inputs to StringArray
                   concat(inputs)
               }
               DataType::LargeUtf8 => {
                   // let inputs = downcast every arg in inputs to LargeStringArray
                   concat_large(inputs)
               }
           }
       }
   }
   ```
   
   I will run with this, but I can see that this may not be what you mean with option 3 (could you please clarify this?).
   
   #### Physical planning
   
   During physical planning, e.g. `Concatenate::new(exprs)`, we need to check that this wont cause execution errors at compute time due to type mismatches. This is because both input (`RecordBatch`) and return (`ArrayRef`) are dynamically typed.
   
   So, we need to specify what types `RecordBatch` can contain (utf8 or largeutf8 in this case). To check that the return type aligns with the next node's valid types, we also need to know the return type of `Concatenate` based on its argument types. So, something like
   
   ```
   fn concatenate_valid_types() -> Vec<DataType>;
   fn concatenate_return_type(arg_types: Vec<DataType>) -> Result<DataType>;
   ```
   
   and these need to be aligned (code-wise) with `Concatenate::evaluate`.
   
   #### Logical planning
   
   Finally, logical planning. We could use `Expr::Concatenate`, or maybe `Expr::ScalarFunction` (for all built-ins). Since `Concatenate`'s return type differs based on its input, we have the same problem as before: we need to declare the return type of `Expr::Concatenate`, which we can only know from its input. So, again, we would need two functions like the ones above (or re-use them) to test that the type is a valid type, and to return the valid type.
   
   Again, I am trying to run through the option and see how it would look like. Is this what you are thinking, or is it something different?
   
   Regardless of whether it is or not, the critical moment for me in all of this is where do we perform the `downcast`, because that is the moment we go from dynamic to static, and my understanding is that the main difference between 1 and 3 is where that downcast happens.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org