You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/08 14:01:35 UTC

[arrow-datafusion] 03/05: fix imported types

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit b0e68008815ece43f2c41439c8c12fd0b70caf34
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Feb 8 20:59:43 2022 +0800

    fix imported types
---
 datafusion-expr/src/lib.rs                 |  28 +++++++
 datafusion-expr/src/udaf.rs                | 115 +++++++++++++----------------
 datafusion-expr/src/udf.rs                 | 114 +++++++++++++---------------
 datafusion/src/physical_plan/aggregates.rs |   9 ---
 datafusion/src/physical_plan/functions.rs  |  15 ----
 datafusion/src/physical_plan/udaf.rs       |  67 +----------------
 6 files changed, 131 insertions(+), 217 deletions(-)

diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 0afc910..6c553d1 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -25,6 +25,34 @@ mod udf;
 mod window_frame;
 mod window_function;
 
+use arrow::datatypes::DataType;
+use datafusion_common::{ColumnarValue, Result};
+use std::sync::Arc;
+
+/// Scalar function
+///
+/// The Fn param is the wrapped function but be aware that the function will
+/// be passed with the slice / vec of columnar values (either scalar or array)
+/// with the exception of zero param function, where a singular element vec
+/// will be passed. In that case the single element is a null array to indicate
+/// the batch's row count (so that the generative zero-argument function can know
+/// the result array size).
+pub type ScalarFunctionImplementation =
+  Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
+
+/// A function's return type
+pub type ReturnTypeFunction =
+  Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
+
+/// the implementation of an aggregate function
+pub type AccumulatorFunctionImplementation =
+  Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync>;
+
+/// This signature corresponds to which types an aggregator serializes
+/// its state, given its return datatype.
+pub type StateTypeFunction =
+  Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
+
 pub use accumulator::Accumulator;
 pub use aggregate_function::AggregateFunction;
 pub use built_in_function::BuiltinScalarFunction;
diff --git a/datafusion-expr/src/udaf.rs b/datafusion-expr/src/udaf.rs
index 3fea4d9..142cfe1 100644
--- a/datafusion-expr/src/udaf.rs
+++ b/datafusion-expr/src/udaf.rs
@@ -17,91 +17,76 @@
 
 //! This module contains functions and structs supporting user-defined aggregate functions.
 
-use fmt::{Debug, Formatter};
-use std::any::Any;
-use std::fmt;
-
-use arrow::{
-    datatypes::Field,
-    datatypes::{DataType, Schema},
-};
-
-use crate::physical_plan::PhysicalExpr;
-use crate::{error::Result, logical_plan::Expr};
-
-use super::{
-    aggregates::AccumulatorFunctionImplementation,
-    aggregates::StateTypeFunction,
-    expressions::format_state_name,
-    functions::{ReturnTypeFunction, Signature},
-    type_coercion::coerce,
-    Accumulator, AggregateExpr,
+use crate::Expr;
+use crate::{
+  AccumulatorFunctionImplementation, ReturnTypeFunction, Signature, StateTypeFunction,
 };
+use std::fmt::{self, Debug, Formatter};
 use std::sync::Arc;
 
 /// Logical representation of a user-defined aggregate function (UDAF)
 /// A UDAF is different from a UDF in that it is stateful across batches.
 #[derive(Clone)]
 pub struct AggregateUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    pub accumulator: AccumulatorFunctionImplementation,
-    /// the accumulator's state's description as a function of the return type
-    pub state_type: StateTypeFunction,
+  /// name
+  pub name: String,
+  /// signature
+  pub signature: Signature,
+  /// Return type
+  pub return_type: ReturnTypeFunction,
+  /// actual implementation
+  pub accumulator: AccumulatorFunctionImplementation,
+  /// the accumulator's state's description as a function of the return type
+  pub state_type: StateTypeFunction,
 }
 
 impl Debug for AggregateUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("AggregateUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
+  fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+    f.debug_struct("AggregateUDF")
+      .field("name", &self.name)
+      .field("signature", &self.signature)
+      .field("fun", &"<FUNC>")
+      .finish()
+  }
 }
 
 impl PartialEq for AggregateUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
+  fn eq(&self, other: &Self) -> bool {
+    self.name == other.name && self.signature == other.signature
+  }
 }
 
 impl std::hash::Hash for AggregateUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
+  fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+    self.name.hash(state);
+    self.signature.hash(state);
+  }
 }
 
 impl AggregateUDF {
-    /// Create a new AggregateUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        accumulator: &AccumulatorFunctionImplementation,
-        state_type: &StateTypeFunction,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            accumulator: accumulator.clone(),
-            state_type: state_type.clone(),
-        }
+  /// Create a new AggregateUDF
+  pub fn new(
+    name: &str,
+    signature: &Signature,
+    return_type: &ReturnTypeFunction,
+    accumulator: &AccumulatorFunctionImplementation,
+    state_type: &StateTypeFunction,
+  ) -> Self {
+    Self {
+      name: name.to_owned(),
+      signature: signature.clone(),
+      return_type: return_type.clone(),
+      accumulator: accumulator.clone(),
+      state_type: state_type.clone(),
     }
+  }
 
-    /// creates a logical expression with a call of the UDAF
-    /// This utility allows using the UDAF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::AggregateUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
+  /// creates a logical expression with a call of the UDAF
+  /// This utility allows using the UDAF without requiring access to the registry.
+  pub fn call(&self, args: Vec<Expr>) -> Expr {
+    Expr::AggregateUDF {
+      fun: Arc::new(self.clone()),
+      args,
     }
+  }
 }
diff --git a/datafusion-expr/src/udf.rs b/datafusion-expr/src/udf.rs
index 9e7bebc..247d6a2 100644
--- a/datafusion-expr/src/udf.rs
+++ b/datafusion-expr/src/udf.rs
@@ -17,87 +17,77 @@
 
 //! UDF support
 
-use fmt::{Debug, Formatter};
+use crate::{Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature};
 use std::fmt;
-
-use arrow::datatypes::Schema;
-
-use crate::error::Result;
-use crate::{logical_plan::Expr, physical_plan::PhysicalExpr};
-
-use super::{
-    functions::{
-        ReturnTypeFunction, ScalarFunctionExpr, ScalarFunctionImplementation, Signature,
-    },
-    type_coercion::coerce,
-};
+use std::fmt::Debug;
+use std::fmt::Formatter;
 use std::sync::Arc;
 
 /// Logical representation of a UDF.
 #[derive(Clone)]
 pub struct ScalarUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    ///
-    /// The fn param is the wrapped function but be aware that the function will
-    /// be passed with the slice / vec of columnar values (either scalar or array)
-    /// with the exception of zero param function, where a singular element vec
-    /// will be passed. In that case the single element is a null array to indicate
-    /// the batch's row count (so that the generative zero-argument function can know
-    /// the result array size).
-    pub fun: ScalarFunctionImplementation,
+  /// name
+  pub name: String,
+  /// signature
+  pub signature: Signature,
+  /// Return type
+  pub return_type: ReturnTypeFunction,
+  /// actual implementation
+  ///
+  /// The fn param is the wrapped function but be aware that the function will
+  /// be passed with the slice / vec of columnar values (either scalar or array)
+  /// with the exception of zero param function, where a singular element vec
+  /// will be passed. In that case the single element is a null array to indicate
+  /// the batch's row count (so that the generative zero-argument function can know
+  /// the result array size).
+  pub fun: ScalarFunctionImplementation,
 }
 
 impl Debug for ScalarUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("ScalarUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
+  fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+    f.debug_struct("ScalarUDF")
+      .field("name", &self.name)
+      .field("signature", &self.signature)
+      .field("fun", &"<FUNC>")
+      .finish()
+  }
 }
 
 impl PartialEq for ScalarUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
+  fn eq(&self, other: &Self) -> bool {
+    self.name == other.name && self.signature == other.signature
+  }
 }
 
 impl std::hash::Hash for ScalarUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
+  fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+    self.name.hash(state);
+    self.signature.hash(state);
+  }
 }
 
 impl ScalarUDF {
-    /// Create a new ScalarUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        fun: &ScalarFunctionImplementation,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            fun: fun.clone(),
-        }
+  /// Create a new ScalarUDF
+  pub fn new(
+    name: &str,
+    signature: &Signature,
+    return_type: &ReturnTypeFunction,
+    fun: &ScalarFunctionImplementation,
+  ) -> Self {
+    Self {
+      name: name.to_owned(),
+      signature: signature.clone(),
+      return_type: return_type.clone(),
+      fun: fun.clone(),
     }
+  }
 
-    /// creates a logical expression with a call of the UDF
-    /// This utility allows using the UDF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::ScalarUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
+  /// creates a logical expression with a call of the UDF
+  /// This utility allows using the UDF without requiring access to the registry.
+  pub fn call(&self, args: Vec<Expr>) -> Expr {
+    Expr::ScalarUDF {
+      fun: Arc::new(self.clone()),
+      args,
     }
+  }
 }
diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs
index a1531d4..656b9c8 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -40,15 +40,6 @@ use expressions::{
 };
 use std::sync::Arc;
 
-/// the implementation of an aggregate function
-pub type AccumulatorFunctionImplementation =
-    Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync>;
-
-/// This signature corresponds to which types an aggregator serializes
-/// its state, given its return datatype.
-pub type StateTypeFunction =
-    Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
-
 pub use datafusion_expr::AggregateFunction;
 
 /// Returns the datatype of the aggregate function.
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 56e8f17..586b2ca 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -59,21 +59,6 @@ use std::{any::Any, fmt, sync::Arc};
 
 pub use datafusion_expr::{BuiltinScalarFunction, Signature, TypeSignature, Volatility};
 
-/// Scalar function
-///
-/// The Fn param is the wrapped function but be aware that the function will
-/// be passed with the slice / vec of columnar values (either scalar or array)
-/// with the exception of zero param function, where a singular element vec
-/// will be passed. In that case the single element is a null array to indicate
-/// the batch's row count (so that the generative zero-argument function can know
-/// the result array size).
-pub type ScalarFunctionImplementation =
-    Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
-
-/// A function's return type
-pub type ReturnTypeFunction =
-    Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
-
 macro_rules! make_utf8_to_return_type {
     ($FUNC:ident, $largeUtf8Type:expr, $utf8Type:expr) => {
         fn $FUNC(arg_type: &DataType, name: &str) -> Result<DataType> {
diff --git a/datafusion/src/physical_plan/udaf.rs b/datafusion/src/physical_plan/udaf.rs
index 0de696d..0e61cb4 100644
--- a/datafusion/src/physical_plan/udaf.rs
+++ b/datafusion/src/physical_plan/udaf.rs
@@ -39,72 +39,7 @@ use super::{
 };
 use std::sync::Arc;
 
-/// Logical representation of a user-defined aggregate function (UDAF)
-/// A UDAF is different from a UDF in that it is stateful across batches.
-#[derive(Clone)]
-pub struct AggregateUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    pub accumulator: AccumulatorFunctionImplementation,
-    /// the accumulator's state's description as a function of the return type
-    pub state_type: StateTypeFunction,
-}
-
-impl Debug for AggregateUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("AggregateUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
-}
-
-impl PartialEq for AggregateUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
-}
-
-impl std::hash::Hash for AggregateUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
-}
-
-impl AggregateUDF {
-    /// Create a new AggregateUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        accumulator: &AccumulatorFunctionImplementation,
-        state_type: &StateTypeFunction,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            accumulator: accumulator.clone(),
-            state_type: state_type.clone(),
-        }
-    }
-
-    /// creates a logical expression with a call of the UDAF
-    /// This utility allows using the UDAF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::AggregateUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
-    }
-}
+pub use datafusion_expr::AggregateUDF;
 
 /// Creates a physical expression of the UDAF, that includes all necessary type coercion.
 /// This function errors when `args`' can't be coerced to a valid argument type of the UDAF.