You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/08 14:01:34 UTC
[arrow-datafusion] 02/05: udf and udaf
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit dea18674f46540d21ae39825a1406689b97007e4
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Feb 8 20:53:42 2022 +0800
udf and udaf
---
datafusion-expr/src/{lib.rs => expr.rs} | 16 -----
datafusion-expr/src/lib.rs | 4 ++
.../udf.rs => datafusion-expr/src/udaf.rs | 83 +++++++++-------------
.../physical_plan => datafusion-expr/src}/udf.rs | 23 ------
datafusion/src/physical_plan/udf.rs | 69 +-----------------
5 files changed, 37 insertions(+), 158 deletions(-)
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/expr.rs
similarity index 61%
copy from datafusion-expr/src/lib.rs
copy to datafusion-expr/src/expr.rs
index eb86b0b..b248758 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/expr.rs
@@ -14,19 +14,3 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
-mod accumulator;
-mod aggregate_function;
-mod built_in_function;
-mod operator;
-mod signature;
-mod window_frame;
-mod window_function;
-
-pub use accumulator::Accumulator;
-pub use aggregate_function::AggregateFunction;
-pub use built_in_function::BuiltinScalarFunction;
-pub use operator::Operator;
-pub use signature::{Signature, TypeSignature, Volatility};
-pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
-pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index eb86b0b..0afc910 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -20,6 +20,8 @@ mod aggregate_function;
mod built_in_function;
mod operator;
mod signature;
+mod udaf;
+mod udf;
mod window_frame;
mod window_function;
@@ -28,5 +30,7 @@ pub use aggregate_function::AggregateFunction;
pub use built_in_function::BuiltinScalarFunction;
pub use operator::Operator;
pub use signature::{Signature, TypeSignature, Volatility};
+pub use udaf::AggregateUDF;
+pub use udf::ScalarUDF;
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion-expr/src/udaf.rs
similarity index 50%
copy from datafusion/src/physical_plan/udf.rs
copy to datafusion-expr/src/udaf.rs
index 7355746..3fea4d9 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion-expr/src/udaf.rs
@@ -15,27 +15,34 @@
// specific language governing permissions and limitations
// under the License.
-//! UDF support
+//! This module contains functions and structs supporting user-defined aggregate functions.
use fmt::{Debug, Formatter};
+use std::any::Any;
use std::fmt;
-use arrow::datatypes::Schema;
+use arrow::{
+ datatypes::Field,
+ datatypes::{DataType, Schema},
+};
-use crate::error::Result;
-use crate::{logical_plan::Expr, physical_plan::PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
+use crate::{error::Result, logical_plan::Expr};
use super::{
- functions::{
- ReturnTypeFunction, ScalarFunctionExpr, ScalarFunctionImplementation, Signature,
- },
+ aggregates::AccumulatorFunctionImplementation,
+ aggregates::StateTypeFunction,
+ expressions::format_state_name,
+ functions::{ReturnTypeFunction, Signature},
type_coercion::coerce,
+ Accumulator, AggregateExpr,
};
use std::sync::Arc;
-/// Logical representation of a UDF.
+/// Logical representation of a user-defined aggregate function (UDAF)
+/// A UDAF is different from a UDF in that it is stateful across batches.
#[derive(Clone)]
-pub struct ScalarUDF {
+pub struct AggregateUDF {
/// name
pub name: String,
/// signature
@@ -43,19 +50,14 @@ pub struct ScalarUDF {
/// Return type
pub return_type: ReturnTypeFunction,
/// actual implementation
- ///
- /// The fn param is the wrapped function but be aware that the function will
- /// be passed with the slice / vec of columnar values (either scalar or array)
- /// with the exception of zero param function, where a singular element vec
- /// will be passed. In that case the single element is a null array to indicate
- /// the batch's row count (so that the generative zero-argument function can know
- /// the result array size).
- pub fun: ScalarFunctionImplementation,
+ pub accumulator: AccumulatorFunctionImplementation,
+ /// the accumulator's state's description as a function of the return type
+ pub state_type: StateTypeFunction,
}
-impl Debug for ScalarUDF {
+impl Debug for AggregateUDF {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- f.debug_struct("ScalarUDF")
+ f.debug_struct("AggregateUDF")
.field("name", &self.name)
.field("signature", &self.signature)
.field("fun", &"<FUNC>")
@@ -63,64 +65,43 @@ impl Debug for ScalarUDF {
}
}
-impl PartialEq for ScalarUDF {
+impl PartialEq for AggregateUDF {
fn eq(&self, other: &Self) -> bool {
self.name == other.name && self.signature == other.signature
}
}
-impl std::hash::Hash for ScalarUDF {
+impl std::hash::Hash for AggregateUDF {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.name.hash(state);
self.signature.hash(state);
}
}
-impl ScalarUDF {
- /// Create a new ScalarUDF
+impl AggregateUDF {
+ /// Create a new AggregateUDF
pub fn new(
name: &str,
signature: &Signature,
return_type: &ReturnTypeFunction,
- fun: &ScalarFunctionImplementation,
+ accumulator: &AccumulatorFunctionImplementation,
+ state_type: &StateTypeFunction,
) -> Self {
Self {
name: name.to_owned(),
signature: signature.clone(),
return_type: return_type.clone(),
- fun: fun.clone(),
+ accumulator: accumulator.clone(),
+ state_type: state_type.clone(),
}
}
- /// creates a logical expression with a call of the UDF
- /// This utility allows using the UDF without requiring access to the registry.
+ /// creates a logical expression with a call of the UDAF
+ /// This utility allows using the UDAF without requiring access to the registry.
pub fn call(&self, args: Vec<Expr>) -> Expr {
- Expr::ScalarUDF {
+ Expr::AggregateUDF {
fun: Arc::new(self.clone()),
args,
}
}
}
-
-/// Create a physical expression of the UDF.
-/// This function errors when `args`' can't be coerced to a valid argument type of the UDF.
-pub fn create_physical_expr(
- fun: &ScalarUDF,
- input_phy_exprs: &[Arc<dyn PhysicalExpr>],
- input_schema: &Schema,
-) -> Result<Arc<dyn PhysicalExpr>> {
- // coerce
- let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &fun.signature)?;
-
- let coerced_exprs_types = coerced_phy_exprs
- .iter()
- .map(|e| e.data_type(input_schema))
- .collect::<Result<Vec<_>>>()?;
-
- Ok(Arc::new(ScalarFunctionExpr::new(
- &fun.name,
- fun.fun.clone(),
- coerced_phy_exprs,
- (fun.return_type)(&coerced_exprs_types)?.as_ref(),
- )))
-}
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion-expr/src/udf.rs
similarity index 81%
copy from datafusion/src/physical_plan/udf.rs
copy to datafusion-expr/src/udf.rs
index 7355746..9e7bebc 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion-expr/src/udf.rs
@@ -101,26 +101,3 @@ impl ScalarUDF {
}
}
}
-
-/// Create a physical expression of the UDF.
-/// This function errors when `args`' can't be coerced to a valid argument type of the UDF.
-pub fn create_physical_expr(
- fun: &ScalarUDF,
- input_phy_exprs: &[Arc<dyn PhysicalExpr>],
- input_schema: &Schema,
-) -> Result<Arc<dyn PhysicalExpr>> {
- // coerce
- let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &fun.signature)?;
-
- let coerced_exprs_types = coerced_phy_exprs
- .iter()
- .map(|e| e.data_type(input_schema))
- .collect::<Result<Vec<_>>>()?;
-
- Ok(Arc::new(ScalarFunctionExpr::new(
- &fun.name,
- fun.fun.clone(),
- coerced_phy_exprs,
- (fun.return_type)(&coerced_exprs_types)?.as_ref(),
- )))
-}
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion/src/physical_plan/udf.rs
index 7355746..85e6b02 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion/src/physical_plan/udf.rs
@@ -33,74 +33,7 @@ use super::{
};
use std::sync::Arc;
-/// Logical representation of a UDF.
-#[derive(Clone)]
-pub struct ScalarUDF {
- /// name
- pub name: String,
- /// signature
- pub signature: Signature,
- /// Return type
- pub return_type: ReturnTypeFunction,
- /// actual implementation
- ///
- /// The fn param is the wrapped function but be aware that the function will
- /// be passed with the slice / vec of columnar values (either scalar or array)
- /// with the exception of zero param function, where a singular element vec
- /// will be passed. In that case the single element is a null array to indicate
- /// the batch's row count (so that the generative zero-argument function can know
- /// the result array size).
- pub fun: ScalarFunctionImplementation,
-}
-
-impl Debug for ScalarUDF {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- f.debug_struct("ScalarUDF")
- .field("name", &self.name)
- .field("signature", &self.signature)
- .field("fun", &"<FUNC>")
- .finish()
- }
-}
-
-impl PartialEq for ScalarUDF {
- fn eq(&self, other: &Self) -> bool {
- self.name == other.name && self.signature == other.signature
- }
-}
-
-impl std::hash::Hash for ScalarUDF {
- fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- self.name.hash(state);
- self.signature.hash(state);
- }
-}
-
-impl ScalarUDF {
- /// Create a new ScalarUDF
- pub fn new(
- name: &str,
- signature: &Signature,
- return_type: &ReturnTypeFunction,
- fun: &ScalarFunctionImplementation,
- ) -> Self {
- Self {
- name: name.to_owned(),
- signature: signature.clone(),
- return_type: return_type.clone(),
- fun: fun.clone(),
- }
- }
-
- /// creates a logical expression with a call of the UDF
- /// This utility allows using the UDF without requiring access to the registry.
- pub fn call(&self, args: Vec<Expr>) -> Expr {
- Expr::ScalarUDF {
- fun: Arc::new(self.clone()),
- args,
- }
- }
-}
+pub use datafusion_expr::ScalarUDF;
/// Create a physical expression of the UDF.
/// This function errors when `args`' can't be coerced to a valid argument type of the UDF.