You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/08 12:59:58 UTC

[arrow-datafusion] branch move-udf-udaf-expr created (now d5eb258)

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a change to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git.


      at d5eb258  fix imported types

This branch includes the following new commits:

     new 310927b  udf and udaf
     new d5eb258  fix imported types

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[arrow-datafusion] 02/02: fix imported types

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit d5eb258049e559d3d93d8f1ff40c473234f90f37
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Feb 8 20:59:43 2022 +0800

    fix imported types
---
 datafusion-expr/src/lib.rs                 |  28 +++++++
 datafusion-expr/src/udaf.rs                | 115 +++++++++++++----------------
 datafusion-expr/src/udf.rs                 | 114 +++++++++++++---------------
 datafusion/src/physical_plan/aggregates.rs |   9 ---
 datafusion/src/physical_plan/functions.rs  |  15 ----
 datafusion/src/physical_plan/udaf.rs       |  67 +----------------
 6 files changed, 131 insertions(+), 217 deletions(-)

diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 9e3d761..69d5a00 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -27,6 +27,34 @@ mod udf;
 mod window_frame;
 mod window_function;
 
+use arrow::datatypes::DataType;
+use datafusion_common::Result;
+use std::sync::Arc;
+
+/// Scalar function
+///
+/// The Fn param is the wrapped function but be aware that the function will
+/// be passed with the slice / vec of columnar values (either scalar or array)
+/// with the exception of zero param function, where a singular element vec
+/// will be passed. In that case the single element is a null array to indicate
+/// the batch's row count (so that the generative zero-argument function can know
+/// the result array size).
+pub type ScalarFunctionImplementation =
+  Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
+
+/// A function's return type
+pub type ReturnTypeFunction =
+  Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
+
+/// the implementation of an aggregate function
+pub type AccumulatorFunctionImplementation =
+  Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync>;
+
+/// This signature corresponds to which types an aggregator serializes
+/// its state, given its return datatype.
+pub type StateTypeFunction =
+  Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
+
 pub use accumulator::Accumulator;
 pub use aggregate_function::AggregateFunction;
 pub use built_in_function::BuiltinScalarFunction;
diff --git a/datafusion-expr/src/udaf.rs b/datafusion-expr/src/udaf.rs
index 3fea4d9..142cfe1 100644
--- a/datafusion-expr/src/udaf.rs
+++ b/datafusion-expr/src/udaf.rs
@@ -17,91 +17,76 @@
 
 //! This module contains functions and structs supporting user-defined aggregate functions.
 
-use fmt::{Debug, Formatter};
-use std::any::Any;
-use std::fmt;
-
-use arrow::{
-    datatypes::Field,
-    datatypes::{DataType, Schema},
-};
-
-use crate::physical_plan::PhysicalExpr;
-use crate::{error::Result, logical_plan::Expr};
-
-use super::{
-    aggregates::AccumulatorFunctionImplementation,
-    aggregates::StateTypeFunction,
-    expressions::format_state_name,
-    functions::{ReturnTypeFunction, Signature},
-    type_coercion::coerce,
-    Accumulator, AggregateExpr,
+use crate::Expr;
+use crate::{
+  AccumulatorFunctionImplementation, ReturnTypeFunction, Signature, StateTypeFunction,
 };
+use std::fmt::{self, Debug, Formatter};
 use std::sync::Arc;
 
 /// Logical representation of a user-defined aggregate function (UDAF)
 /// A UDAF is different from a UDF in that it is stateful across batches.
 #[derive(Clone)]
 pub struct AggregateUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    pub accumulator: AccumulatorFunctionImplementation,
-    /// the accumulator's state's description as a function of the return type
-    pub state_type: StateTypeFunction,
+  /// name
+  pub name: String,
+  /// signature
+  pub signature: Signature,
+  /// Return type
+  pub return_type: ReturnTypeFunction,
+  /// actual implementation
+  pub accumulator: AccumulatorFunctionImplementation,
+  /// the accumulator's state's description as a function of the return type
+  pub state_type: StateTypeFunction,
 }
 
 impl Debug for AggregateUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("AggregateUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
+  fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+    f.debug_struct("AggregateUDF")
+      .field("name", &self.name)
+      .field("signature", &self.signature)
+      .field("fun", &"<FUNC>")
+      .finish()
+  }
 }
 
 impl PartialEq for AggregateUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
+  fn eq(&self, other: &Self) -> bool {
+    self.name == other.name && self.signature == other.signature
+  }
 }
 
 impl std::hash::Hash for AggregateUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
+  fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+    self.name.hash(state);
+    self.signature.hash(state);
+  }
 }
 
 impl AggregateUDF {
-    /// Create a new AggregateUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        accumulator: &AccumulatorFunctionImplementation,
-        state_type: &StateTypeFunction,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            accumulator: accumulator.clone(),
-            state_type: state_type.clone(),
-        }
+  /// Create a new AggregateUDF
+  pub fn new(
+    name: &str,
+    signature: &Signature,
+    return_type: &ReturnTypeFunction,
+    accumulator: &AccumulatorFunctionImplementation,
+    state_type: &StateTypeFunction,
+  ) -> Self {
+    Self {
+      name: name.to_owned(),
+      signature: signature.clone(),
+      return_type: return_type.clone(),
+      accumulator: accumulator.clone(),
+      state_type: state_type.clone(),
     }
+  }
 
-    /// creates a logical expression with a call of the UDAF
-    /// This utility allows using the UDAF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::AggregateUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
+  /// creates a logical expression with a call of the UDAF
+  /// This utility allows using the UDAF without requiring access to the registry.
+  pub fn call(&self, args: Vec<Expr>) -> Expr {
+    Expr::AggregateUDF {
+      fun: Arc::new(self.clone()),
+      args,
     }
+  }
 }
diff --git a/datafusion-expr/src/udf.rs b/datafusion-expr/src/udf.rs
index 9e7bebc..247d6a2 100644
--- a/datafusion-expr/src/udf.rs
+++ b/datafusion-expr/src/udf.rs
@@ -17,87 +17,77 @@
 
 //! UDF support
 
-use fmt::{Debug, Formatter};
+use crate::{Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature};
 use std::fmt;
-
-use arrow::datatypes::Schema;
-
-use crate::error::Result;
-use crate::{logical_plan::Expr, physical_plan::PhysicalExpr};
-
-use super::{
-    functions::{
-        ReturnTypeFunction, ScalarFunctionExpr, ScalarFunctionImplementation, Signature,
-    },
-    type_coercion::coerce,
-};
+use std::fmt::Debug;
+use std::fmt::Formatter;
 use std::sync::Arc;
 
 /// Logical representation of a UDF.
 #[derive(Clone)]
 pub struct ScalarUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    ///
-    /// The fn param is the wrapped function but be aware that the function will
-    /// be passed with the slice / vec of columnar values (either scalar or array)
-    /// with the exception of zero param function, where a singular element vec
-    /// will be passed. In that case the single element is a null array to indicate
-    /// the batch's row count (so that the generative zero-argument function can know
-    /// the result array size).
-    pub fun: ScalarFunctionImplementation,
+  /// name
+  pub name: String,
+  /// signature
+  pub signature: Signature,
+  /// Return type
+  pub return_type: ReturnTypeFunction,
+  /// actual implementation
+  ///
+  /// The fn param is the wrapped function but be aware that the function will
+  /// be passed with the slice / vec of columnar values (either scalar or array)
+  /// with the exception of zero param function, where a singular element vec
+  /// will be passed. In that case the single element is a null array to indicate
+  /// the batch's row count (so that the generative zero-argument function can know
+  /// the result array size).
+  pub fun: ScalarFunctionImplementation,
 }
 
 impl Debug for ScalarUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("ScalarUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
+  fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+    f.debug_struct("ScalarUDF")
+      .field("name", &self.name)
+      .field("signature", &self.signature)
+      .field("fun", &"<FUNC>")
+      .finish()
+  }
 }
 
 impl PartialEq for ScalarUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
+  fn eq(&self, other: &Self) -> bool {
+    self.name == other.name && self.signature == other.signature
+  }
 }
 
 impl std::hash::Hash for ScalarUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
+  fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+    self.name.hash(state);
+    self.signature.hash(state);
+  }
 }
 
 impl ScalarUDF {
-    /// Create a new ScalarUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        fun: &ScalarFunctionImplementation,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            fun: fun.clone(),
-        }
+  /// Create a new ScalarUDF
+  pub fn new(
+    name: &str,
+    signature: &Signature,
+    return_type: &ReturnTypeFunction,
+    fun: &ScalarFunctionImplementation,
+  ) -> Self {
+    Self {
+      name: name.to_owned(),
+      signature: signature.clone(),
+      return_type: return_type.clone(),
+      fun: fun.clone(),
     }
+  }
 
-    /// creates a logical expression with a call of the UDF
-    /// This utility allows using the UDF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::ScalarUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
+  /// creates a logical expression with a call of the UDF
+  /// This utility allows using the UDF without requiring access to the registry.
+  pub fn call(&self, args: Vec<Expr>) -> Expr {
+    Expr::ScalarUDF {
+      fun: Arc::new(self.clone()),
+      args,
     }
+  }
 }
diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs
index a1531d4..656b9c8 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -40,15 +40,6 @@ use expressions::{
 };
 use std::sync::Arc;
 
-/// the implementation of an aggregate function
-pub type AccumulatorFunctionImplementation =
-    Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync>;
-
-/// This signature corresponds to which types an aggregator serializes
-/// its state, given its return datatype.
-pub type StateTypeFunction =
-    Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
-
 pub use datafusion_expr::AggregateFunction;
 
 /// Returns the datatype of the aggregate function.
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 56e8f17..586b2ca 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -59,21 +59,6 @@ use std::{any::Any, fmt, sync::Arc};
 
 pub use datafusion_expr::{BuiltinScalarFunction, Signature, TypeSignature, Volatility};
 
-/// Scalar function
-///
-/// The Fn param is the wrapped function but be aware that the function will
-/// be passed with the slice / vec of columnar values (either scalar or array)
-/// with the exception of zero param function, where a singular element vec
-/// will be passed. In that case the single element is a null array to indicate
-/// the batch's row count (so that the generative zero-argument function can know
-/// the result array size).
-pub type ScalarFunctionImplementation =
-    Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
-
-/// A function's return type
-pub type ReturnTypeFunction =
-    Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
-
 macro_rules! make_utf8_to_return_type {
     ($FUNC:ident, $largeUtf8Type:expr, $utf8Type:expr) => {
         fn $FUNC(arg_type: &DataType, name: &str) -> Result<DataType> {
diff --git a/datafusion/src/physical_plan/udaf.rs b/datafusion/src/physical_plan/udaf.rs
index 0de696d..0e61cb4 100644
--- a/datafusion/src/physical_plan/udaf.rs
+++ b/datafusion/src/physical_plan/udaf.rs
@@ -39,72 +39,7 @@ use super::{
 };
 use std::sync::Arc;
 
-/// Logical representation of a user-defined aggregate function (UDAF)
-/// A UDAF is different from a UDF in that it is stateful across batches.
-#[derive(Clone)]
-pub struct AggregateUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    pub accumulator: AccumulatorFunctionImplementation,
-    /// the accumulator's state's description as a function of the return type
-    pub state_type: StateTypeFunction,
-}
-
-impl Debug for AggregateUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("AggregateUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
-}
-
-impl PartialEq for AggregateUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
-}
-
-impl std::hash::Hash for AggregateUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
-}
-
-impl AggregateUDF {
-    /// Create a new AggregateUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        accumulator: &AccumulatorFunctionImplementation,
-        state_type: &StateTypeFunction,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            accumulator: accumulator.clone(),
-            state_type: state_type.clone(),
-        }
-    }
-
-    /// creates a logical expression with a call of the UDAF
-    /// This utility allows using the UDAF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::AggregateUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
-    }
-}
+pub use datafusion_expr::AggregateUDF;
 
 /// Creates a physical expression of the UDAF, that includes all necessary type coercion.
 /// This function errors when `args`' can't be coerced to a valid argument type of the UDAF.

[arrow-datafusion] 01/02: udf and udaf

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 310927b7c4c648a75384342fc4ca3b60a93acf54
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Feb 8 20:53:42 2022 +0800

    udf and udaf
---
 datafusion-expr/src/{lib.rs => expr.rs}            | 18 -----
 datafusion-expr/src/lib.rs                         |  5 ++
 .../udf.rs => datafusion-expr/src/udaf.rs          | 83 +++++++++-------------
 .../physical_plan => datafusion-expr/src}/udf.rs   | 23 ------
 datafusion/src/physical_plan/udf.rs                | 69 +-----------------
 5 files changed, 38 insertions(+), 160 deletions(-)

diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/expr.rs
similarity index 58%
copy from datafusion-expr/src/lib.rs
copy to datafusion-expr/src/expr.rs
index 2491fcf..b248758 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/expr.rs
@@ -14,21 +14,3 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-mod accumulator;
-mod aggregate_function;
-mod built_in_function;
-mod columnar_value;
-mod operator;
-mod signature;
-mod window_frame;
-mod window_function;
-
-pub use accumulator::Accumulator;
-pub use aggregate_function::AggregateFunction;
-pub use built_in_function::BuiltinScalarFunction;
-pub use columnar_value::{ColumnarValue, NullColumnarValue};
-pub use operator::Operator;
-pub use signature::{Signature, TypeSignature, Volatility};
-pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
-pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 2491fcf..9e3d761 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -19,8 +19,11 @@ mod accumulator;
 mod aggregate_function;
 mod built_in_function;
 mod columnar_value;
+mod expr;
 mod operator;
 mod signature;
+mod udaf;
+mod udf;
 mod window_frame;
 mod window_function;
 
@@ -30,5 +33,7 @@ pub use built_in_function::BuiltinScalarFunction;
 pub use columnar_value::{ColumnarValue, NullColumnarValue};
 pub use operator::Operator;
 pub use signature::{Signature, TypeSignature, Volatility};
+pub use udaf::AggregateUDF;
+pub use udf::ScalarUDF;
 pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
 pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion-expr/src/udaf.rs
similarity index 50%
copy from datafusion/src/physical_plan/udf.rs
copy to datafusion-expr/src/udaf.rs
index 7355746..3fea4d9 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion-expr/src/udaf.rs
@@ -15,27 +15,34 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! UDF support
+//! This module contains functions and structs supporting user-defined aggregate functions.
 
 use fmt::{Debug, Formatter};
+use std::any::Any;
 use std::fmt;
 
-use arrow::datatypes::Schema;
+use arrow::{
+    datatypes::Field,
+    datatypes::{DataType, Schema},
+};
 
-use crate::error::Result;
-use crate::{logical_plan::Expr, physical_plan::PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
+use crate::{error::Result, logical_plan::Expr};
 
 use super::{
-    functions::{
-        ReturnTypeFunction, ScalarFunctionExpr, ScalarFunctionImplementation, Signature,
-    },
+    aggregates::AccumulatorFunctionImplementation,
+    aggregates::StateTypeFunction,
+    expressions::format_state_name,
+    functions::{ReturnTypeFunction, Signature},
     type_coercion::coerce,
+    Accumulator, AggregateExpr,
 };
 use std::sync::Arc;
 
-/// Logical representation of a UDF.
+/// Logical representation of a user-defined aggregate function (UDAF)
+/// A UDAF is different from a UDF in that it is stateful across batches.
 #[derive(Clone)]
-pub struct ScalarUDF {
+pub struct AggregateUDF {
     /// name
     pub name: String,
     /// signature
@@ -43,19 +50,14 @@ pub struct ScalarUDF {
     /// Return type
     pub return_type: ReturnTypeFunction,
     /// actual implementation
-    ///
-    /// The fn param is the wrapped function but be aware that the function will
-    /// be passed with the slice / vec of columnar values (either scalar or array)
-    /// with the exception of zero param function, where a singular element vec
-    /// will be passed. In that case the single element is a null array to indicate
-    /// the batch's row count (so that the generative zero-argument function can know
-    /// the result array size).
-    pub fun: ScalarFunctionImplementation,
+    pub accumulator: AccumulatorFunctionImplementation,
+    /// the accumulator's state's description as a function of the return type
+    pub state_type: StateTypeFunction,
 }
 
-impl Debug for ScalarUDF {
+impl Debug for AggregateUDF {
     fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("ScalarUDF")
+        f.debug_struct("AggregateUDF")
             .field("name", &self.name)
             .field("signature", &self.signature)
             .field("fun", &"<FUNC>")
@@ -63,64 +65,43 @@ impl Debug for ScalarUDF {
     }
 }
 
-impl PartialEq for ScalarUDF {
+impl PartialEq for AggregateUDF {
     fn eq(&self, other: &Self) -> bool {
         self.name == other.name && self.signature == other.signature
     }
 }
 
-impl std::hash::Hash for ScalarUDF {
+impl std::hash::Hash for AggregateUDF {
     fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
         self.name.hash(state);
         self.signature.hash(state);
     }
 }
 
-impl ScalarUDF {
-    /// Create a new ScalarUDF
+impl AggregateUDF {
+    /// Create a new AggregateUDF
     pub fn new(
         name: &str,
         signature: &Signature,
         return_type: &ReturnTypeFunction,
-        fun: &ScalarFunctionImplementation,
+        accumulator: &AccumulatorFunctionImplementation,
+        state_type: &StateTypeFunction,
     ) -> Self {
         Self {
             name: name.to_owned(),
             signature: signature.clone(),
             return_type: return_type.clone(),
-            fun: fun.clone(),
+            accumulator: accumulator.clone(),
+            state_type: state_type.clone(),
         }
     }
 
-    /// creates a logical expression with a call of the UDF
-    /// This utility allows using the UDF without requiring access to the registry.
+    /// creates a logical expression with a call of the UDAF
+    /// This utility allows using the UDAF without requiring access to the registry.
     pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::ScalarUDF {
+        Expr::AggregateUDF {
             fun: Arc::new(self.clone()),
             args,
         }
     }
 }
-
-/// Create a physical expression of the UDF.
-/// This function errors when `args`' can't be coerced to a valid argument type of the UDF.
-pub fn create_physical_expr(
-    fun: &ScalarUDF,
-    input_phy_exprs: &[Arc<dyn PhysicalExpr>],
-    input_schema: &Schema,
-) -> Result<Arc<dyn PhysicalExpr>> {
-    // coerce
-    let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &fun.signature)?;
-
-    let coerced_exprs_types = coerced_phy_exprs
-        .iter()
-        .map(|e| e.data_type(input_schema))
-        .collect::<Result<Vec<_>>>()?;
-
-    Ok(Arc::new(ScalarFunctionExpr::new(
-        &fun.name,
-        fun.fun.clone(),
-        coerced_phy_exprs,
-        (fun.return_type)(&coerced_exprs_types)?.as_ref(),
-    )))
-}
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion-expr/src/udf.rs
similarity index 81%
copy from datafusion/src/physical_plan/udf.rs
copy to datafusion-expr/src/udf.rs
index 7355746..9e7bebc 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion-expr/src/udf.rs
@@ -101,26 +101,3 @@ impl ScalarUDF {
         }
     }
 }
-
-/// Create a physical expression of the UDF.
-/// This function errors when `args`' can't be coerced to a valid argument type of the UDF.
-pub fn create_physical_expr(
-    fun: &ScalarUDF,
-    input_phy_exprs: &[Arc<dyn PhysicalExpr>],
-    input_schema: &Schema,
-) -> Result<Arc<dyn PhysicalExpr>> {
-    // coerce
-    let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &fun.signature)?;
-
-    let coerced_exprs_types = coerced_phy_exprs
-        .iter()
-        .map(|e| e.data_type(input_schema))
-        .collect::<Result<Vec<_>>>()?;
-
-    Ok(Arc::new(ScalarFunctionExpr::new(
-        &fun.name,
-        fun.fun.clone(),
-        coerced_phy_exprs,
-        (fun.return_type)(&coerced_exprs_types)?.as_ref(),
-    )))
-}
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion/src/physical_plan/udf.rs
index 7355746..85e6b02 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion/src/physical_plan/udf.rs
@@ -33,74 +33,7 @@ use super::{
 };
 use std::sync::Arc;
 
-/// Logical representation of a UDF.
-#[derive(Clone)]
-pub struct ScalarUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    ///
-    /// The fn param is the wrapped function but be aware that the function will
-    /// be passed with the slice / vec of columnar values (either scalar or array)
-    /// with the exception of zero param function, where a singular element vec
-    /// will be passed. In that case the single element is a null array to indicate
-    /// the batch's row count (so that the generative zero-argument function can know
-    /// the result array size).
-    pub fun: ScalarFunctionImplementation,
-}
-
-impl Debug for ScalarUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("ScalarUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
-}
-
-impl PartialEq for ScalarUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
-}
-
-impl std::hash::Hash for ScalarUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
-}
-
-impl ScalarUDF {
-    /// Create a new ScalarUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        fun: &ScalarFunctionImplementation,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            fun: fun.clone(),
-        }
-    }
-
-    /// creates a logical expression with a call of the UDF
-    /// This utility allows using the UDF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::ScalarUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
-    }
-}
+pub use datafusion_expr::ScalarUDF;
 
 /// Create a physical expression of the UDF.
 /// This function errors when `args`' can't be coerced to a valid argument type of the UDF.