You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/08 12:59:59 UTC

[arrow-datafusion] 01/02: udf and udaf

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 310927b7c4c648a75384342fc4ca3b60a93acf54
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Feb 8 20:53:42 2022 +0800

    udf and udaf
---
 datafusion-expr/src/{lib.rs => expr.rs}            | 18 -----
 datafusion-expr/src/lib.rs                         |  5 ++
 .../udf.rs => datafusion-expr/src/udaf.rs          | 83 +++++++++-------------
 .../physical_plan => datafusion-expr/src}/udf.rs   | 23 ------
 datafusion/src/physical_plan/udf.rs                | 69 +-----------------
 5 files changed, 38 insertions(+), 160 deletions(-)

diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/expr.rs
similarity index 58%
copy from datafusion-expr/src/lib.rs
copy to datafusion-expr/src/expr.rs
index 2491fcf..b248758 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/expr.rs
@@ -14,21 +14,3 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-mod accumulator;
-mod aggregate_function;
-mod built_in_function;
-mod columnar_value;
-mod operator;
-mod signature;
-mod window_frame;
-mod window_function;
-
-pub use accumulator::Accumulator;
-pub use aggregate_function::AggregateFunction;
-pub use built_in_function::BuiltinScalarFunction;
-pub use columnar_value::{ColumnarValue, NullColumnarValue};
-pub use operator::Operator;
-pub use signature::{Signature, TypeSignature, Volatility};
-pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
-pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 2491fcf..9e3d761 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -19,8 +19,11 @@ mod accumulator;
 mod aggregate_function;
 mod built_in_function;
 mod columnar_value;
+mod expr;
 mod operator;
 mod signature;
+mod udaf;
+mod udf;
 mod window_frame;
 mod window_function;
 
@@ -30,5 +33,7 @@ pub use built_in_function::BuiltinScalarFunction;
 pub use columnar_value::{ColumnarValue, NullColumnarValue};
 pub use operator::Operator;
 pub use signature::{Signature, TypeSignature, Volatility};
+pub use udaf::AggregateUDF;
+pub use udf::ScalarUDF;
 pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
 pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion-expr/src/udaf.rs
similarity index 50%
copy from datafusion/src/physical_plan/udf.rs
copy to datafusion-expr/src/udaf.rs
index 7355746..3fea4d9 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion-expr/src/udaf.rs
@@ -15,27 +15,34 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! UDF support
+//! This module contains functions and structs supporting user-defined aggregate functions.
 
 use fmt::{Debug, Formatter};
+use std::any::Any;
 use std::fmt;
 
-use arrow::datatypes::Schema;
+use arrow::{
+    datatypes::Field,
+    datatypes::{DataType, Schema},
+};
 
-use crate::error::Result;
-use crate::{logical_plan::Expr, physical_plan::PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
+use crate::{error::Result, logical_plan::Expr};
 
 use super::{
-    functions::{
-        ReturnTypeFunction, ScalarFunctionExpr, ScalarFunctionImplementation, Signature,
-    },
+    aggregates::AccumulatorFunctionImplementation,
+    aggregates::StateTypeFunction,
+    expressions::format_state_name,
+    functions::{ReturnTypeFunction, Signature},
     type_coercion::coerce,
+    Accumulator, AggregateExpr,
 };
 use std::sync::Arc;
 
-/// Logical representation of a UDF.
+/// Logical representation of a user-defined aggregate function (UDAF)
+/// A UDAF is different from a UDF in that it is stateful across batches.
 #[derive(Clone)]
-pub struct ScalarUDF {
+pub struct AggregateUDF {
     /// name
     pub name: String,
     /// signature
@@ -43,19 +50,14 @@ pub struct ScalarUDF {
     /// Return type
     pub return_type: ReturnTypeFunction,
     /// actual implementation
-    ///
-    /// The fn param is the wrapped function but be aware that the function will
-    /// be passed with the slice / vec of columnar values (either scalar or array)
-    /// with the exception of zero param function, where a singular element vec
-    /// will be passed. In that case the single element is a null array to indicate
-    /// the batch's row count (so that the generative zero-argument function can know
-    /// the result array size).
-    pub fun: ScalarFunctionImplementation,
+    pub accumulator: AccumulatorFunctionImplementation,
+    /// the accumulator's state's description as a function of the return type
+    pub state_type: StateTypeFunction,
 }
 
-impl Debug for ScalarUDF {
+impl Debug for AggregateUDF {
     fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("ScalarUDF")
+        f.debug_struct("AggregateUDF")
             .field("name", &self.name)
             .field("signature", &self.signature)
             .field("fun", &"<FUNC>")
@@ -63,64 +65,43 @@ impl Debug for ScalarUDF {
     }
 }
 
-impl PartialEq for ScalarUDF {
+impl PartialEq for AggregateUDF {
     fn eq(&self, other: &Self) -> bool {
         self.name == other.name && self.signature == other.signature
     }
 }
 
-impl std::hash::Hash for ScalarUDF {
+impl std::hash::Hash for AggregateUDF {
     fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
         self.name.hash(state);
         self.signature.hash(state);
     }
 }
 
-impl ScalarUDF {
-    /// Create a new ScalarUDF
+impl AggregateUDF {
+    /// Create a new AggregateUDF
     pub fn new(
         name: &str,
         signature: &Signature,
         return_type: &ReturnTypeFunction,
-        fun: &ScalarFunctionImplementation,
+        accumulator: &AccumulatorFunctionImplementation,
+        state_type: &StateTypeFunction,
     ) -> Self {
         Self {
             name: name.to_owned(),
             signature: signature.clone(),
             return_type: return_type.clone(),
-            fun: fun.clone(),
+            accumulator: accumulator.clone(),
+            state_type: state_type.clone(),
         }
     }
 
-    /// creates a logical expression with a call of the UDF
-    /// This utility allows using the UDF without requiring access to the registry.
+    /// creates a logical expression with a call of the UDAF
+    /// This utility allows using the UDAF without requiring access to the registry.
     pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::ScalarUDF {
+        Expr::AggregateUDF {
             fun: Arc::new(self.clone()),
             args,
         }
     }
 }
-
-/// Create a physical expression of the UDF.
-/// This function errors when `args`' can't be coerced to a valid argument type of the UDF.
-pub fn create_physical_expr(
-    fun: &ScalarUDF,
-    input_phy_exprs: &[Arc<dyn PhysicalExpr>],
-    input_schema: &Schema,
-) -> Result<Arc<dyn PhysicalExpr>> {
-    // coerce
-    let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &fun.signature)?;
-
-    let coerced_exprs_types = coerced_phy_exprs
-        .iter()
-        .map(|e| e.data_type(input_schema))
-        .collect::<Result<Vec<_>>>()?;
-
-    Ok(Arc::new(ScalarFunctionExpr::new(
-        &fun.name,
-        fun.fun.clone(),
-        coerced_phy_exprs,
-        (fun.return_type)(&coerced_exprs_types)?.as_ref(),
-    )))
-}
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion-expr/src/udf.rs
similarity index 81%
copy from datafusion/src/physical_plan/udf.rs
copy to datafusion-expr/src/udf.rs
index 7355746..9e7bebc 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion-expr/src/udf.rs
@@ -101,26 +101,3 @@ impl ScalarUDF {
         }
     }
 }
-
-/// Create a physical expression of the UDF.
-/// This function errors when `args`' can't be coerced to a valid argument type of the UDF.
-pub fn create_physical_expr(
-    fun: &ScalarUDF,
-    input_phy_exprs: &[Arc<dyn PhysicalExpr>],
-    input_schema: &Schema,
-) -> Result<Arc<dyn PhysicalExpr>> {
-    // coerce
-    let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &fun.signature)?;
-
-    let coerced_exprs_types = coerced_phy_exprs
-        .iter()
-        .map(|e| e.data_type(input_schema))
-        .collect::<Result<Vec<_>>>()?;
-
-    Ok(Arc::new(ScalarFunctionExpr::new(
-        &fun.name,
-        fun.fun.clone(),
-        coerced_phy_exprs,
-        (fun.return_type)(&coerced_exprs_types)?.as_ref(),
-    )))
-}
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion/src/physical_plan/udf.rs
index 7355746..85e6b02 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion/src/physical_plan/udf.rs
@@ -33,74 +33,7 @@ use super::{
 };
 use std::sync::Arc;
 
-/// Logical representation of a UDF.
-#[derive(Clone)]
-pub struct ScalarUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    ///
-    /// The fn param is the wrapped function but be aware that the function will
-    /// be passed with the slice / vec of columnar values (either scalar or array)
-    /// with the exception of zero param function, where a singular element vec
-    /// will be passed. In that case the single element is a null array to indicate
-    /// the batch's row count (so that the generative zero-argument function can know
-    /// the result array size).
-    pub fun: ScalarFunctionImplementation,
-}
-
-impl Debug for ScalarUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("ScalarUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
-}
-
-impl PartialEq for ScalarUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
-}
-
-impl std::hash::Hash for ScalarUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
-}
-
-impl ScalarUDF {
-    /// Create a new ScalarUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        fun: &ScalarFunctionImplementation,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            fun: fun.clone(),
-        }
-    }
-
-    /// creates a logical expression with a call of the UDF
-    /// This utility allows using the UDF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::ScalarUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
-    }
-}
+pub use datafusion_expr::ScalarUDF;
 
 /// Create a physical expression of the UDF.
 /// This function errors when `args`' can't be coerced to a valid argument type of the UDF.