You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/08 13:00:00 UTC
[arrow-datafusion] 02/02: fix imported types
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit d5eb258049e559d3d93d8f1ff40c473234f90f37
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Feb 8 20:59:43 2022 +0800
fix imported types
---
datafusion-expr/src/lib.rs | 28 +++++++
datafusion-expr/src/udaf.rs | 115 +++++++++++++----------------
datafusion-expr/src/udf.rs | 114 +++++++++++++---------------
datafusion/src/physical_plan/aggregates.rs | 9 ---
datafusion/src/physical_plan/functions.rs | 15 ----
datafusion/src/physical_plan/udaf.rs | 67 +----------------
6 files changed, 131 insertions(+), 217 deletions(-)
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 9e3d761..69d5a00 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -27,6 +27,34 @@ mod udf;
mod window_frame;
mod window_function;
+use arrow::datatypes::DataType;
+use datafusion_common::Result;
+use std::sync::Arc;
+
+/// Scalar function
+///
+/// The Fn param is the wrapped function but be aware that the function will
+/// be passed with the slice / vec of columnar values (either scalar or array)
+/// with the exception of zero param function, where a singular element vec
+/// will be passed. In that case the single element is a null array to indicate
+/// the batch's row count (so that the generative zero-argument function can know
+/// the result array size).
+pub type ScalarFunctionImplementation =
+ Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
+
+/// A function's return type
+pub type ReturnTypeFunction =
+ Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
+
+/// the implementation of an aggregate function
+pub type AccumulatorFunctionImplementation =
+ Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync>;
+
+/// This signature corresponds to which types an aggregator serializes
+/// its state, given its return datatype.
+pub type StateTypeFunction =
+ Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
+
pub use accumulator::Accumulator;
pub use aggregate_function::AggregateFunction;
pub use built_in_function::BuiltinScalarFunction;
diff --git a/datafusion-expr/src/udaf.rs b/datafusion-expr/src/udaf.rs
index 3fea4d9..142cfe1 100644
--- a/datafusion-expr/src/udaf.rs
+++ b/datafusion-expr/src/udaf.rs
@@ -17,91 +17,76 @@
//! This module contains functions and structs supporting user-defined aggregate functions.
-use fmt::{Debug, Formatter};
-use std::any::Any;
-use std::fmt;
-
-use arrow::{
- datatypes::Field,
- datatypes::{DataType, Schema},
-};
-
-use crate::physical_plan::PhysicalExpr;
-use crate::{error::Result, logical_plan::Expr};
-
-use super::{
- aggregates::AccumulatorFunctionImplementation,
- aggregates::StateTypeFunction,
- expressions::format_state_name,
- functions::{ReturnTypeFunction, Signature},
- type_coercion::coerce,
- Accumulator, AggregateExpr,
+use crate::Expr;
+use crate::{
+ AccumulatorFunctionImplementation, ReturnTypeFunction, Signature, StateTypeFunction,
};
+use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
/// Logical representation of a user-defined aggregate function (UDAF)
/// A UDAF is different from a UDF in that it is stateful across batches.
#[derive(Clone)]
pub struct AggregateUDF {
- /// name
- pub name: String,
- /// signature
- pub signature: Signature,
- /// Return type
- pub return_type: ReturnTypeFunction,
- /// actual implementation
- pub accumulator: AccumulatorFunctionImplementation,
- /// the accumulator's state's description as a function of the return type
- pub state_type: StateTypeFunction,
+ /// name
+ pub name: String,
+ /// signature
+ pub signature: Signature,
+ /// Return type
+ pub return_type: ReturnTypeFunction,
+ /// actual implementation
+ pub accumulator: AccumulatorFunctionImplementation,
+ /// the accumulator's state's description as a function of the return type
+ pub state_type: StateTypeFunction,
}
impl Debug for AggregateUDF {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- f.debug_struct("AggregateUDF")
- .field("name", &self.name)
- .field("signature", &self.signature)
- .field("fun", &"<FUNC>")
- .finish()
- }
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ f.debug_struct("AggregateUDF")
+ .field("name", &self.name)
+ .field("signature", &self.signature)
+ .field("fun", &"<FUNC>")
+ .finish()
+ }
}
impl PartialEq for AggregateUDF {
- fn eq(&self, other: &Self) -> bool {
- self.name == other.name && self.signature == other.signature
- }
+ fn eq(&self, other: &Self) -> bool {
+ self.name == other.name && self.signature == other.signature
+ }
}
impl std::hash::Hash for AggregateUDF {
- fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- self.name.hash(state);
- self.signature.hash(state);
- }
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.name.hash(state);
+ self.signature.hash(state);
+ }
}
impl AggregateUDF {
- /// Create a new AggregateUDF
- pub fn new(
- name: &str,
- signature: &Signature,
- return_type: &ReturnTypeFunction,
- accumulator: &AccumulatorFunctionImplementation,
- state_type: &StateTypeFunction,
- ) -> Self {
- Self {
- name: name.to_owned(),
- signature: signature.clone(),
- return_type: return_type.clone(),
- accumulator: accumulator.clone(),
- state_type: state_type.clone(),
- }
+ /// Create a new AggregateUDF
+ pub fn new(
+ name: &str,
+ signature: &Signature,
+ return_type: &ReturnTypeFunction,
+ accumulator: &AccumulatorFunctionImplementation,
+ state_type: &StateTypeFunction,
+ ) -> Self {
+ Self {
+ name: name.to_owned(),
+ signature: signature.clone(),
+ return_type: return_type.clone(),
+ accumulator: accumulator.clone(),
+ state_type: state_type.clone(),
}
+ }
- /// creates a logical expression with a call of the UDAF
- /// This utility allows using the UDAF without requiring access to the registry.
- pub fn call(&self, args: Vec<Expr>) -> Expr {
- Expr::AggregateUDF {
- fun: Arc::new(self.clone()),
- args,
- }
+ /// creates a logical expression with a call of the UDAF
+ /// This utility allows using the UDAF without requiring access to the registry.
+ pub fn call(&self, args: Vec<Expr>) -> Expr {
+ Expr::AggregateUDF {
+ fun: Arc::new(self.clone()),
+ args,
}
+ }
}
diff --git a/datafusion-expr/src/udf.rs b/datafusion-expr/src/udf.rs
index 9e7bebc..247d6a2 100644
--- a/datafusion-expr/src/udf.rs
+++ b/datafusion-expr/src/udf.rs
@@ -17,87 +17,77 @@
//! UDF support
-use fmt::{Debug, Formatter};
+use crate::{Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature};
use std::fmt;
-
-use arrow::datatypes::Schema;
-
-use crate::error::Result;
-use crate::{logical_plan::Expr, physical_plan::PhysicalExpr};
-
-use super::{
- functions::{
- ReturnTypeFunction, ScalarFunctionExpr, ScalarFunctionImplementation, Signature,
- },
- type_coercion::coerce,
-};
+use std::fmt::Debug;
+use std::fmt::Formatter;
use std::sync::Arc;
/// Logical representation of a UDF.
#[derive(Clone)]
pub struct ScalarUDF {
- /// name
- pub name: String,
- /// signature
- pub signature: Signature,
- /// Return type
- pub return_type: ReturnTypeFunction,
- /// actual implementation
- ///
- /// The fn param is the wrapped function but be aware that the function will
- /// be passed with the slice / vec of columnar values (either scalar or array)
- /// with the exception of zero param function, where a singular element vec
- /// will be passed. In that case the single element is a null array to indicate
- /// the batch's row count (so that the generative zero-argument function can know
- /// the result array size).
- pub fun: ScalarFunctionImplementation,
+ /// name
+ pub name: String,
+ /// signature
+ pub signature: Signature,
+ /// Return type
+ pub return_type: ReturnTypeFunction,
+ /// actual implementation
+ ///
+ /// The fn param is the wrapped function but be aware that the function will
+ /// be passed with the slice / vec of columnar values (either scalar or array)
+ /// with the exception of zero param function, where a singular element vec
+ /// will be passed. In that case the single element is a null array to indicate
+ /// the batch's row count (so that the generative zero-argument function can know
+ /// the result array size).
+ pub fun: ScalarFunctionImplementation,
}
impl Debug for ScalarUDF {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- f.debug_struct("ScalarUDF")
- .field("name", &self.name)
- .field("signature", &self.signature)
- .field("fun", &"<FUNC>")
- .finish()
- }
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ f.debug_struct("ScalarUDF")
+ .field("name", &self.name)
+ .field("signature", &self.signature)
+ .field("fun", &"<FUNC>")
+ .finish()
+ }
}
impl PartialEq for ScalarUDF {
- fn eq(&self, other: &Self) -> bool {
- self.name == other.name && self.signature == other.signature
- }
+ fn eq(&self, other: &Self) -> bool {
+ self.name == other.name && self.signature == other.signature
+ }
}
impl std::hash::Hash for ScalarUDF {
- fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- self.name.hash(state);
- self.signature.hash(state);
- }
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.name.hash(state);
+ self.signature.hash(state);
+ }
}
impl ScalarUDF {
- /// Create a new ScalarUDF
- pub fn new(
- name: &str,
- signature: &Signature,
- return_type: &ReturnTypeFunction,
- fun: &ScalarFunctionImplementation,
- ) -> Self {
- Self {
- name: name.to_owned(),
- signature: signature.clone(),
- return_type: return_type.clone(),
- fun: fun.clone(),
- }
+ /// Create a new ScalarUDF
+ pub fn new(
+ name: &str,
+ signature: &Signature,
+ return_type: &ReturnTypeFunction,
+ fun: &ScalarFunctionImplementation,
+ ) -> Self {
+ Self {
+ name: name.to_owned(),
+ signature: signature.clone(),
+ return_type: return_type.clone(),
+ fun: fun.clone(),
}
+ }
- /// creates a logical expression with a call of the UDF
- /// This utility allows using the UDF without requiring access to the registry.
- pub fn call(&self, args: Vec<Expr>) -> Expr {
- Expr::ScalarUDF {
- fun: Arc::new(self.clone()),
- args,
- }
+ /// creates a logical expression with a call of the UDF
+ /// This utility allows using the UDF without requiring access to the registry.
+ pub fn call(&self, args: Vec<Expr>) -> Expr {
+ Expr::ScalarUDF {
+ fun: Arc::new(self.clone()),
+ args,
}
+ }
}
diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs
index a1531d4..656b9c8 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -40,15 +40,6 @@ use expressions::{
};
use std::sync::Arc;
-/// the implementation of an aggregate function
-pub type AccumulatorFunctionImplementation =
- Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync>;
-
-/// This signature corresponds to which types an aggregator serializes
-/// its state, given its return datatype.
-pub type StateTypeFunction =
- Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
-
pub use datafusion_expr::AggregateFunction;
/// Returns the datatype of the aggregate function.
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 56e8f17..586b2ca 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -59,21 +59,6 @@ use std::{any::Any, fmt, sync::Arc};
pub use datafusion_expr::{BuiltinScalarFunction, Signature, TypeSignature, Volatility};
-/// Scalar function
-///
-/// The Fn param is the wrapped function but be aware that the function will
-/// be passed with the slice / vec of columnar values (either scalar or array)
-/// with the exception of zero param function, where a singular element vec
-/// will be passed. In that case the single element is a null array to indicate
-/// the batch's row count (so that the generative zero-argument function can know
-/// the result array size).
-pub type ScalarFunctionImplementation =
- Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
-
-/// A function's return type
-pub type ReturnTypeFunction =
- Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
-
macro_rules! make_utf8_to_return_type {
($FUNC:ident, $largeUtf8Type:expr, $utf8Type:expr) => {
fn $FUNC(arg_type: &DataType, name: &str) -> Result<DataType> {
diff --git a/datafusion/src/physical_plan/udaf.rs b/datafusion/src/physical_plan/udaf.rs
index 0de696d..0e61cb4 100644
--- a/datafusion/src/physical_plan/udaf.rs
+++ b/datafusion/src/physical_plan/udaf.rs
@@ -39,72 +39,7 @@ use super::{
};
use std::sync::Arc;
-/// Logical representation of a user-defined aggregate function (UDAF)
-/// A UDAF is different from a UDF in that it is stateful across batches.
-#[derive(Clone)]
-pub struct AggregateUDF {
- /// name
- pub name: String,
- /// signature
- pub signature: Signature,
- /// Return type
- pub return_type: ReturnTypeFunction,
- /// actual implementation
- pub accumulator: AccumulatorFunctionImplementation,
- /// the accumulator's state's description as a function of the return type
- pub state_type: StateTypeFunction,
-}
-
-impl Debug for AggregateUDF {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- f.debug_struct("AggregateUDF")
- .field("name", &self.name)
- .field("signature", &self.signature)
- .field("fun", &"<FUNC>")
- .finish()
- }
-}
-
-impl PartialEq for AggregateUDF {
- fn eq(&self, other: &Self) -> bool {
- self.name == other.name && self.signature == other.signature
- }
-}
-
-impl std::hash::Hash for AggregateUDF {
- fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- self.name.hash(state);
- self.signature.hash(state);
- }
-}
-
-impl AggregateUDF {
- /// Create a new AggregateUDF
- pub fn new(
- name: &str,
- signature: &Signature,
- return_type: &ReturnTypeFunction,
- accumulator: &AccumulatorFunctionImplementation,
- state_type: &StateTypeFunction,
- ) -> Self {
- Self {
- name: name.to_owned(),
- signature: signature.clone(),
- return_type: return_type.clone(),
- accumulator: accumulator.clone(),
- state_type: state_type.clone(),
- }
- }
-
- /// creates a logical expression with a call of the UDAF
- /// This utility allows using the UDAF without requiring access to the registry.
- pub fn call(&self, args: Vec<Expr>) -> Expr {
- Expr::AggregateUDF {
- fun: Arc::new(self.clone()),
- args,
- }
- }
-}
+pub use datafusion_expr::AggregateUDF;
/// Creates a physical expression of the UDAF, that includes all necessary type coercion.
/// This function errors when `args`' can't be coerced to a valid argument type of the UDAF.