You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/15 14:34:13 UTC

[GitHub] [arrow] jorgecarleitao opened a new pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

jorgecarleitao opened a new pull request #7971:
URL: https://github.com/apache/arrow/pull/7971


   This PR adds an interface to create aggregate UDFs and migrates all current aggregate expressions to this new interface, thereby unifying everything in a common interface to handle aggregations.
   
   I had to change some tests as our current name assignment for aggregates was a bit funky. This work is based on top of #7967 , as some of the internal changes there are required here.
   
   FYI @andygrove and @alamb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb edited a comment on pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
alamb edited a comment on pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#issuecomment-674512463


   I left some comments on #7967 and I'll plan to review this PR after that one gets finalized. You are a machine @jorgecarleitao 🚄 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#discussion_r471899766



##########
File path: rust/datafusion/src/execution/physical_plan/udf.rs
##########
@@ -146,3 +154,99 @@ impl PhysicalExpr for ScalarFunctionExpr {
         (fun)(&inputs)
     }
 }
+
+/// A generic aggregate function
+/*
+An aggregate function accepts an arbitrary number of arguments, of arbitrary data types,
+and returns an arbitrary type based on the incoming types.
+
+It is the developer of the function's responsibility to ensure that the aggregator correctly handles the different
+types that are presented to them, and that the return type correctly matches the type returned by the
+aggregator.
+
+It is the user of the function's responsibility to pass arguments to the function that have valid types.
+*/
+#[derive(Clone)]
+pub struct AggregateFunction {
+    /// Function name
+    pub name: String,
+    /// A list of arguments and their respective types. A function can accept more than one type as argument
+    /// (e.g. sum(i8), sum(u8)).
+    pub arg_types: Vec<Vec<DataType>>,
+    /// Return type. This function takes
+    pub return_type: ReturnType,

Review comment:
       This change and is under discussion in the mailing list.
   
   Essentially, the question is whether we should accept UDFs to have an input-dependent type or not (should this be a function or a DataType).
   
   If we decide to not accept input-dependent types, then UDFs are simpler (multiple input types, single output type), but we can't re-write our aggregates as UDFs
   
   If we decide to accept input-dependent types, then UDFs are more complex (multiple input types, multiple output type), but we can uniformise them all in a single interface in our code.
   
   We can also do something in the middle, on which we declare an interface for functions in our end that support (multiple input types, multiple output type), but only expose public interfaces to register (multiple input types, single output type) UDFs.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#issuecomment-674407010


   https://issues.apache.org/jira/browse/ARROW-9752


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#issuecomment-675442799


   Looking briefly at Spark, it seems that user defined aggregates have a single defined return type:
   https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html
   
   ```
     // This is the output type of your aggregatation function.
     override def dataType: DataType = DoubleType
   ```
   
   I am not aware of other analytic frameworks that allow users to specify User defined functions that have changeable type


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#discussion_r471899766



##########
File path: rust/datafusion/src/execution/physical_plan/udf.rs
##########
@@ -146,3 +154,99 @@ impl PhysicalExpr for ScalarFunctionExpr {
         (fun)(&inputs)
     }
 }
+
+/// A generic aggregate function
+/*
+An aggregate function accepts an arbitrary number of arguments, of arbitrary data types,
+and returns an arbitrary type based on the incoming types.
+
+It is the developer of the function's responsibility to ensure that the aggregator correctly handles the different
+types that are presented to them, and that the return type correctly matches the type returned by the
+aggregator.
+
+It is the user of the function's responsibility to pass arguments to the function that have valid types.
+*/
+#[derive(Clone)]
+pub struct AggregateFunction {
+    /// Function name
+    pub name: String,
+    /// A list of arguments and their respective types. A function can accept more than one type as argument
+    /// (e.g. sum(i8), sum(u8)).
+    pub arg_types: Vec<Vec<DataType>>,
+    /// Return type. This function takes
+    pub return_type: ReturnType,

Review comment:
       This change and is under discussion in the mailing list.
   
   Essentially, the question is whether we should accept UDFs to have an input-dependent type or not (should this be a function or a DataType).
   
   If we decide to not accept input-dependent types, then UDFs are simpler (multiple input types, single output type), but we can't re-write our aggregates as UDFs
   
   If we decide to accept input-dependent types, then UDFs are more complex (multiple input types, multiple output type), and we can uniformize them all in a single interface.
   
   We can also do something in the middle, on which we declare an interface for functions in our end that support (multiple input types, multiple output type), but only expose public interfaces to register (multiple input types, single output type) UDFs.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#issuecomment-675254823


   I took a very quick look at Spark just now and here are some observations:
   
   - math expressions such as sqrt always return double and don't try to
   optimize to smaller types
   - aggregate expressions min/max return the same type as their input
   - sum returns long, double, or decimal, depending on the input type
   
   
   
   
   On Mon, Aug 17, 2020 at 10:42 PM Andy Grove <an...@gmail.com> wrote:
   
   > When faced with choices like this, it is often helpful to look at how
   > other projects implement this. Perhaps we could look at calcite or spark to
   > see what choices they made? I am more familiar with spark at this point so
   > could research the approach used there.
   >
   > On Mon, Aug 17, 2020, 9:59 PM Jorge Leitao <no...@github.com>
   > wrote:
   >
   >> *@jorgecarleitao* commented on this pull request.
   >> ------------------------------
   >>
   >> In rust/datafusion/src/execution/physical_plan/udf.rs
   >> <https://github.com/apache/arrow/pull/7971#discussion_r471899766>:
   >>
   >> > +
   >> +It is the developer of the function's responsibility to ensure that the aggregator correctly handles the different
   >> +types that are presented to them, and that the return type correctly matches the type returned by the
   >> +aggregator.
   >> +
   >> +It is the user of the function's responsibility to pass arguments to the function that have valid types.
   >> +*/
   >> +#[derive(Clone)]
   >> +pub struct AggregateFunction {
   >> +    /// Function name
   >> +    pub name: String,
   >> +    /// A list of arguments and their respective types. A function can accept more than one type as argument
   >> +    /// (e.g. sum(i8), sum(u8)).
   >> +    pub arg_types: Vec<Vec<DataType>>,
   >> +    /// Return type. This function takes
   >> +    pub return_type: ReturnType,
   >>
   >> This change and is under discussion in the mailing list.
   >>
   >> Essentially, the question is whether we should accept UDFs to have an
   >> input-dependent type or not (should this be a function or a DataType).
   >>
   >> If we decide to not accept input-dependent types, then UDFs are simpler
   >> (multiple input types, single output type), but we can't re-write our
   >> aggregates as UDFs
   >>
   >> If we decide to accept input-dependent types, then UDFs are more complex
   >> (multiple input types, multiple output type), and we can uniformize them
   >> all in a single interface.
   >>
   >> We can also do something in the middle, on which we declare an interface
   >> for functions in our end that support (multiple input types, multiple
   >> output type), but only expose public interfaces to register (multiple input
   >> types, single output type) UDFs.
   >>
   >> —
   >> You are receiving this because you were mentioned.
   >> Reply to this email directly, view it on GitHub
   >> <https://github.com/apache/arrow/pull/7971#pullrequestreview-468974772>,
   >> or unsubscribe
   >> <https://github.com/notifications/unsubscribe-auth/AAHEBRBWO7BL54QSCQ7DPWDSBH4DZANCNFSM4QAJVXOA>
   >> .
   >>
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#issuecomment-674411381


   This is looking good.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#issuecomment-675042360


   @andygrove , since I rebased this branch on top of master with the changes of the lock of scalar_functions, the tests consistently halt. Do you or @alamb have any hints on what this may be?
   
   Note that this introduces a second set of functions (aggregates). I used the same code as for scalars for it. 
   
   Also/alternatively, do you have any tips on how to debug when such things happen in rust?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#issuecomment-675245608


   When faced with choices like this, it is often helpful to look at how other
   projects implement this. Perhaps we could look at calcite or spark to see
   what choices they made? I am more familiar with spark at this point so
   could research the approach used there.
   
   On Mon, Aug 17, 2020, 9:59 PM Jorge Leitao <no...@github.com> wrote:
   
   > *@jorgecarleitao* commented on this pull request.
   > ------------------------------
   >
   > In rust/datafusion/src/execution/physical_plan/udf.rs
   > <https://github.com/apache/arrow/pull/7971#discussion_r471899766>:
   >
   > > +
   > +It is the developer of the function's responsibility to ensure that the aggregator correctly handles the different
   > +types that are presented to them, and that the return type correctly matches the type returned by the
   > +aggregator.
   > +
   > +It is the user of the function's responsibility to pass arguments to the function that have valid types.
   > +*/
   > +#[derive(Clone)]
   > +pub struct AggregateFunction {
   > +    /// Function name
   > +    pub name: String,
   > +    /// A list of arguments and their respective types. A function can accept more than one type as argument
   > +    /// (e.g. sum(i8), sum(u8)).
   > +    pub arg_types: Vec<Vec<DataType>>,
   > +    /// Return type. This function takes
   > +    pub return_type: ReturnType,
   >
   > This change and is under discussion in the mailing list.
   >
   > Essentially, the question is whether we should accept UDFs to have an
   > input-dependent type or not (should this be a function or a DataType).
   >
   > If we decide to not accept input-dependent types, then UDFs are simpler
   > (multiple input types, single output type), but we can't re-write our
   > aggregates as UDFs
   >
   > If we decide to accept input-dependent types, then UDFs are more complex
   > (multiple input types, multiple output type), and we can uniformize them
   > all in a single interface.
   >
   > We can also do something in the middle, on which we declare an interface
   > for functions in our end that support (multiple input types, multiple
   > output type), but only expose public interfaces to register (multiple input
   > types, single output type) UDFs.
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/arrow/pull/7971#pullrequestreview-468974772>,
   > or unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AAHEBRBWO7BL54QSCQ7DPWDSBH4DZANCNFSM4QAJVXOA>
   > .
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#issuecomment-675520454


   I am fine with either option we offer to our users. My concern at the moment is not the public API, but the internal one: after spending some time working on this code base, I now do believe that we are much better served by:
   
   1. describing all our internal functions as generic functions with a variable output type (that may be a constant function)
   2. migrate all our aggregates to generic functions.
   
   With this, we have a very simple way to add new functions, both aggregates and scalars, both with variable and fixed types.
   
   With this in place, we can trivially expose an interface to our users that supports a single return type; we just need to change our public interface to narrow the function registration to accept a single type.
   
   Regardless, I am closing this as I can see that @andygrove plans to work over this functionality. I will help on the review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao edited a comment on pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#issuecomment-675520454


   I am fine with either option we offer to our users. My concern at the moment is not the public API, but the internal one: after spending some time working on this code base, I now do believe that we are much better served by:
   
   1. describing all our internal functions as generic functions with a variable output type (that may be a constant function)
   2. migrate all our aggregates to generic functions.
   
   With this, we have a very simple way to add new functions, both aggregates and scalars, both with variable and fixed types.
   
   With this in place, we can trivially expose an interface to our users that supports a single return type; we just need to change our public interface to narrow the function registration to accept a type, and move that type to inside a closure that we plug in our internal API.
   
   Regardless, I am closing this as I can see that @andygrove plans to work over this functionality. I will help on the review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#discussion_r471894969



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -509,33 +534,76 @@ impl SchemaProvider for ExecutionContextState {
     }
 
     fn get_function_meta(&self, name: &str) -> Option<Arc<FunctionMeta>> {
-        self.scalar_functions
+        let scalar = self
+            .scalar_functions
             .lock()
             .expect("failed to lock mutex")
             .get(name)
             .map(|f| {
                 Arc::new(FunctionMeta::new(
                     name.to_owned(),
-                    f.args.clone(),
+                    f.arg_types.clone(),
                     f.return_type.clone(),
                     FunctionType::Scalar,
                 ))
+            });
+        // give priority to scalar functions
+        if scalar.is_some() {
+            return scalar;
+        }
+
+        self.aggregate_functions
+            .lock()
+            .expect("failed to lock mutex")
+            .get(name)
+            .map(|f| {
+                Arc::new(FunctionMeta::new(
+                    name.to_owned(),
+                    f.arg_types.clone(),
+                    // this is wrong, but the actual type is overwritten by the physical plan
+                    // as aggregate functions have a variable type.
+                    DataType::Float32,

Review comment:
       @alamb  this is one of the issues that the thread in the mailing list is about: the return type in FunctionMeta is constant, but aggregates in our code base have a variable type. As such, we can't represent our aggregates as generic functions, which implies that we can't reconcile the logical and physical plans.
   
   [This statement](https://github.com/apache/arrow/blob/2f98d1e65ef8821aecd9f716cfd6176f21315969/rust/datafusion/src/sql/planner.rs#L491) is our current hack to this issue: we hard-code in the sql planner that the return type is consistent with the physical expressions.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#issuecomment-674484314


   This is now ready to review, but since it is based on #7967 , let's review that one first.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#discussion_r471706745



##########
File path: rust/datafusion/src/execution/physical_plan/udf.rs
##########
@@ -20,25 +20,33 @@
 use std::fmt;
 
 use arrow::array::ArrayRef;
-use arrow::datatypes::{DataType, Field, Schema};
+use arrow::datatypes::{DataType, Schema};
 
 use crate::error::Result;
 use crate::execution::physical_plan::PhysicalExpr;
 
+use super::{Accumulator, AggregateExpr, Aggregator};
 use arrow::record_batch::RecordBatch;
 use fmt::{Debug, Formatter};
-use std::sync::Arc;
+use std::{cell::RefCell, rc::Rc, sync::Arc};
 
 /// Scalar UDF
 pub type ScalarUdf = Arc<dyn Fn(&[ArrayRef]) -> Result<ArrayRef> + Send + Sync>;
 
+/// Function to construct the return type of a function given its arguments.
+pub type ReturnType =

Review comment:
       This can't be an HashMap because the number of arguments is unknown and thus we don't know the key length a priori :(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao closed pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
jorgecarleitao closed pull request #7971:
URL: https://github.com/apache/arrow/pull/7971


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7971: ARROW-9752: [Rust] [DataFusion] Add support for aggregate UDFs

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7971:
URL: https://github.com/apache/arrow/pull/7971#issuecomment-674512463


   I left some comments on #7967 and I'll plan to review this PR after that one gets finalized. You are a machine @jorgecarleitao 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org