You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/08 14:01:32 UTC

[arrow-datafusion] branch move-udf-udaf-expr updated (2547502 -> 6ca6163)

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a change to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git.


 discard 2547502  fix compile err
 discard d5eb258  fix imported types
 discard 310927b  udf and udaf
 discard 3540d00  pyarrow
     add 4b68273  move built-in scalar functions (#1764)
     new 6c312cc  pyarrow
     new dea1867  udf and udaf
     new b0e6800  fix imported types
     new 62c9062  split expr type and null info to be expr-schemable
     new 6ca6163  move accumulator and columnar value

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (2547502)
            \
             N -- N -- N   refs/heads/move-udf-udaf-expr (6ca6163)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 datafusion-expr/Cargo.toml                         |   1 -
 datafusion-expr/src/expr.rs                        | 892 ---------------------
 datafusion-expr/src/lib.rs                         |  12 +-
 datafusion-expr/src/udaf.rs                        |  94 +--
 datafusion-expr/src/udf.rs                         |  98 +--
 datafusion-expr/src/window_function.rs             |  78 --
 datafusion/src/logical_plan/builder.rs             |   1 +
 datafusion/src/logical_plan/expr.rs                | 737 ++++++++++++++++-
 datafusion/src/logical_plan/expr_schema.rs         | 180 +++++
 datafusion/src/logical_plan/mod.rs                 |   2 +
 .../src/optimizer/common_subexpr_eliminate.rs      |   2 +-
 datafusion/src/optimizer/simplify_expressions.rs   |   8 +-
 datafusion/tests/simplification.rs                 |   1 +
 13 files changed, 1022 insertions(+), 1084 deletions(-)
 create mode 100644 datafusion/src/logical_plan/expr_schema.rs

[arrow-datafusion] 05/05: move accumulator and columnar value

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 6ca6163923876ac38340c2019102fbb699388642
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:29:20 2022 +0800

    move accumulator and columnar value
---
 datafusion-expr/src/lib.rs | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 6c553d1..2dc9c17 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -18,6 +18,7 @@
 mod accumulator;
 mod aggregate_function;
 mod built_in_function;
+mod columnar_value;
 mod operator;
 mod signature;
 mod udaf;
@@ -26,7 +27,7 @@ mod window_frame;
 mod window_function;
 
 use arrow::datatypes::DataType;
-use datafusion_common::{ColumnarValue, Result};
+use datafusion_common::Result;
 use std::sync::Arc;
 
 /// Scalar function
@@ -56,6 +57,7 @@ pub type StateTypeFunction =
 pub use accumulator::Accumulator;
 pub use aggregate_function::AggregateFunction;
 pub use built_in_function::BuiltinScalarFunction;
+pub use columnar_value::ColumnarValue;
 pub use operator::Operator;
 pub use signature::{Signature, TypeSignature, Volatility};
 pub use udaf::AggregateUDF;

[arrow-datafusion] 02/05: udf and udaf

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit dea18674f46540d21ae39825a1406689b97007e4
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Feb 8 20:53:42 2022 +0800

    udf and udaf
---
 datafusion-expr/src/{lib.rs => expr.rs}            | 16 -----
 datafusion-expr/src/lib.rs                         |  4 ++
 .../udf.rs => datafusion-expr/src/udaf.rs          | 83 +++++++++-------------
 .../physical_plan => datafusion-expr/src}/udf.rs   | 23 ------
 datafusion/src/physical_plan/udf.rs                | 69 +-----------------
 5 files changed, 37 insertions(+), 158 deletions(-)

diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/expr.rs
similarity index 61%
copy from datafusion-expr/src/lib.rs
copy to datafusion-expr/src/expr.rs
index eb86b0b..b248758 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/expr.rs
@@ -14,19 +14,3 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-mod accumulator;
-mod aggregate_function;
-mod built_in_function;
-mod operator;
-mod signature;
-mod window_frame;
-mod window_function;
-
-pub use accumulator::Accumulator;
-pub use aggregate_function::AggregateFunction;
-pub use built_in_function::BuiltinScalarFunction;
-pub use operator::Operator;
-pub use signature::{Signature, TypeSignature, Volatility};
-pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
-pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index eb86b0b..0afc910 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -20,6 +20,8 @@ mod aggregate_function;
 mod built_in_function;
 mod operator;
 mod signature;
+mod udaf;
+mod udf;
 mod window_frame;
 mod window_function;
 
@@ -28,5 +30,7 @@ pub use aggregate_function::AggregateFunction;
 pub use built_in_function::BuiltinScalarFunction;
 pub use operator::Operator;
 pub use signature::{Signature, TypeSignature, Volatility};
+pub use udaf::AggregateUDF;
+pub use udf::ScalarUDF;
 pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
 pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion-expr/src/udaf.rs
similarity index 50%
copy from datafusion/src/physical_plan/udf.rs
copy to datafusion-expr/src/udaf.rs
index 7355746..3fea4d9 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion-expr/src/udaf.rs
@@ -15,27 +15,34 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! UDF support
+//! This module contains functions and structs supporting user-defined aggregate functions.
 
 use fmt::{Debug, Formatter};
+use std::any::Any;
 use std::fmt;
 
-use arrow::datatypes::Schema;
+use arrow::{
+    datatypes::Field,
+    datatypes::{DataType, Schema},
+};
 
-use crate::error::Result;
-use crate::{logical_plan::Expr, physical_plan::PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
+use crate::{error::Result, logical_plan::Expr};
 
 use super::{
-    functions::{
-        ReturnTypeFunction, ScalarFunctionExpr, ScalarFunctionImplementation, Signature,
-    },
+    aggregates::AccumulatorFunctionImplementation,
+    aggregates::StateTypeFunction,
+    expressions::format_state_name,
+    functions::{ReturnTypeFunction, Signature},
     type_coercion::coerce,
+    Accumulator, AggregateExpr,
 };
 use std::sync::Arc;
 
-/// Logical representation of a UDF.
+/// Logical representation of a user-defined aggregate function (UDAF)
+/// A UDAF is different from a UDF in that it is stateful across batches.
 #[derive(Clone)]
-pub struct ScalarUDF {
+pub struct AggregateUDF {
     /// name
     pub name: String,
     /// signature
@@ -43,19 +50,14 @@ pub struct ScalarUDF {
     /// Return type
     pub return_type: ReturnTypeFunction,
     /// actual implementation
-    ///
-    /// The fn param is the wrapped function but be aware that the function will
-    /// be passed with the slice / vec of columnar values (either scalar or array)
-    /// with the exception of zero param function, where a singular element vec
-    /// will be passed. In that case the single element is a null array to indicate
-    /// the batch's row count (so that the generative zero-argument function can know
-    /// the result array size).
-    pub fun: ScalarFunctionImplementation,
+    pub accumulator: AccumulatorFunctionImplementation,
+    /// the accumulator's state's description as a function of the return type
+    pub state_type: StateTypeFunction,
 }
 
-impl Debug for ScalarUDF {
+impl Debug for AggregateUDF {
     fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("ScalarUDF")
+        f.debug_struct("AggregateUDF")
             .field("name", &self.name)
             .field("signature", &self.signature)
             .field("fun", &"<FUNC>")
@@ -63,64 +65,43 @@ impl Debug for ScalarUDF {
     }
 }
 
-impl PartialEq for ScalarUDF {
+impl PartialEq for AggregateUDF {
     fn eq(&self, other: &Self) -> bool {
         self.name == other.name && self.signature == other.signature
     }
 }
 
-impl std::hash::Hash for ScalarUDF {
+impl std::hash::Hash for AggregateUDF {
     fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
         self.name.hash(state);
         self.signature.hash(state);
     }
 }
 
-impl ScalarUDF {
-    /// Create a new ScalarUDF
+impl AggregateUDF {
+    /// Create a new AggregateUDF
     pub fn new(
         name: &str,
         signature: &Signature,
         return_type: &ReturnTypeFunction,
-        fun: &ScalarFunctionImplementation,
+        accumulator: &AccumulatorFunctionImplementation,
+        state_type: &StateTypeFunction,
     ) -> Self {
         Self {
             name: name.to_owned(),
             signature: signature.clone(),
             return_type: return_type.clone(),
-            fun: fun.clone(),
+            accumulator: accumulator.clone(),
+            state_type: state_type.clone(),
         }
     }
 
-    /// creates a logical expression with a call of the UDF
-    /// This utility allows using the UDF without requiring access to the registry.
+    /// creates a logical expression with a call of the UDAF
+    /// This utility allows using the UDAF without requiring access to the registry.
     pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::ScalarUDF {
+        Expr::AggregateUDF {
             fun: Arc::new(self.clone()),
             args,
         }
     }
 }
-
-/// Create a physical expression of the UDF.
-/// This function errors when `args`' can't be coerced to a valid argument type of the UDF.
-pub fn create_physical_expr(
-    fun: &ScalarUDF,
-    input_phy_exprs: &[Arc<dyn PhysicalExpr>],
-    input_schema: &Schema,
-) -> Result<Arc<dyn PhysicalExpr>> {
-    // coerce
-    let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &fun.signature)?;
-
-    let coerced_exprs_types = coerced_phy_exprs
-        .iter()
-        .map(|e| e.data_type(input_schema))
-        .collect::<Result<Vec<_>>>()?;
-
-    Ok(Arc::new(ScalarFunctionExpr::new(
-        &fun.name,
-        fun.fun.clone(),
-        coerced_phy_exprs,
-        (fun.return_type)(&coerced_exprs_types)?.as_ref(),
-    )))
-}
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion-expr/src/udf.rs
similarity index 81%
copy from datafusion/src/physical_plan/udf.rs
copy to datafusion-expr/src/udf.rs
index 7355746..9e7bebc 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion-expr/src/udf.rs
@@ -101,26 +101,3 @@ impl ScalarUDF {
         }
     }
 }
-
-/// Create a physical expression of the UDF.
-/// This function errors when `args`' can't be coerced to a valid argument type of the UDF.
-pub fn create_physical_expr(
-    fun: &ScalarUDF,
-    input_phy_exprs: &[Arc<dyn PhysicalExpr>],
-    input_schema: &Schema,
-) -> Result<Arc<dyn PhysicalExpr>> {
-    // coerce
-    let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &fun.signature)?;
-
-    let coerced_exprs_types = coerced_phy_exprs
-        .iter()
-        .map(|e| e.data_type(input_schema))
-        .collect::<Result<Vec<_>>>()?;
-
-    Ok(Arc::new(ScalarFunctionExpr::new(
-        &fun.name,
-        fun.fun.clone(),
-        coerced_phy_exprs,
-        (fun.return_type)(&coerced_exprs_types)?.as_ref(),
-    )))
-}
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion/src/physical_plan/udf.rs
index 7355746..85e6b02 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion/src/physical_plan/udf.rs
@@ -33,74 +33,7 @@ use super::{
 };
 use std::sync::Arc;
 
-/// Logical representation of a UDF.
-#[derive(Clone)]
-pub struct ScalarUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    ///
-    /// The fn param is the wrapped function but be aware that the function will
-    /// be passed with the slice / vec of columnar values (either scalar or array)
-    /// with the exception of zero param function, where a singular element vec
-    /// will be passed. In that case the single element is a null array to indicate
-    /// the batch's row count (so that the generative zero-argument function can know
-    /// the result array size).
-    pub fun: ScalarFunctionImplementation,
-}
-
-impl Debug for ScalarUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("ScalarUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
-}
-
-impl PartialEq for ScalarUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
-}
-
-impl std::hash::Hash for ScalarUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
-}
-
-impl ScalarUDF {
-    /// Create a new ScalarUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        fun: &ScalarFunctionImplementation,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            fun: fun.clone(),
-        }
-    }
-
-    /// creates a logical expression with a call of the UDF
-    /// This utility allows using the UDF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::ScalarUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
-    }
-}
+pub use datafusion_expr::ScalarUDF;
 
 /// Create a physical expression of the UDF.
 /// This function errors when `args`' can't be coerced to a valid argument type of the UDF.

[arrow-datafusion] 04/05: split expr type and null info to be expr-schemable

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 62c906218ddeb9e419db94786752643bc6e96f64
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Feb 8 21:35:20 2022 +0800

    split expr type and null info to be expr-schemable
---
 datafusion/src/logical_plan/builder.rs             |   1 +
 datafusion/src/logical_plan/expr.rs                | 152 +----------------
 datafusion/src/logical_plan/expr_schema.rs         | 180 +++++++++++++++++++++
 datafusion/src/logical_plan/mod.rs                 |   2 +
 .../src/optimizer/common_subexpr_eliminate.rs      |   2 +-
 datafusion/src/optimizer/simplify_expressions.rs   |   8 +-
 datafusion/tests/simplification.rs                 |   1 +
 7 files changed, 191 insertions(+), 155 deletions(-)

diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index d81fa9d..a722238 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -25,6 +25,7 @@ use crate::datasource::{
     MemTable, TableProvider,
 };
 use crate::error::{DataFusionError, Result};
+use crate::logical_plan::expr_schema::ExprSchemable;
 use crate::logical_plan::plan::{
     Aggregate, Analyze, EmptyRelation, Explain, Filter, Join, Projection, Sort,
     TableScan, ToStringifiedPlan, Union, Window,
diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs
index 69da346..0c1fac4 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -20,13 +20,10 @@
 
 pub use super::Operator;
 use crate::error::{DataFusionError, Result};
-use crate::field_util::get_indexed_field;
+use crate::logical_plan::ExprSchemable;
 use crate::logical_plan::{window_frames, DFField, DFSchema};
 use crate::physical_plan::functions::Volatility;
-use crate::physical_plan::{
-    aggregates, expressions::binary_operator_data_type, functions, udf::ScalarUDF,
-    window_functions,
-};
+use crate::physical_plan::{aggregates, functions, udf::ScalarUDF, window_functions};
 use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
 use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
 use arrow::{compute::can_cast_types, datatypes::DataType};
@@ -251,151 +248,6 @@ impl PartialOrd for Expr {
 }
 
 impl Expr {
-    /// Returns the [arrow::datatypes::DataType] of the expression
-    /// based on [ExprSchema]
-    ///
-    /// Note: [DFSchema] implements [ExprSchema].
-    ///
-    /// # Errors
-    ///
-    /// This function errors when it is not possible to compute its
-    /// [arrow::datatypes::DataType].  This happens when e.g. the
-    /// expression refers to a column that does not exist in the
-    /// schema, or when the expression is incorrectly typed
-    /// (e.g. `[utf8] + [bool]`).
-    pub fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType> {
-        match self {
-            Expr::Alias(expr, _) | Expr::Sort { expr, .. } | Expr::Negative(expr) => {
-                expr.get_type(schema)
-            }
-            Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
-            Expr::ScalarVariable(_) => Ok(DataType::Utf8),
-            Expr::Literal(l) => Ok(l.get_datatype()),
-            Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema),
-            Expr::Cast { data_type, .. } | Expr::TryCast { data_type, .. } => {
-                Ok(data_type.clone())
-            }
-            Expr::ScalarUDF { fun, args } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                Ok((fun.return_type)(&data_types)?.as_ref().clone())
-            }
-            Expr::ScalarFunction { fun, args } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                functions::return_type(fun, &data_types)
-            }
-            Expr::WindowFunction { fun, args, .. } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                window_functions::return_type(fun, &data_types)
-            }
-            Expr::AggregateFunction { fun, args, .. } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                aggregates::return_type(fun, &data_types)
-            }
-            Expr::AggregateUDF { fun, args, .. } => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                Ok((fun.return_type)(&data_types)?.as_ref().clone())
-            }
-            Expr::Not(_)
-            | Expr::IsNull(_)
-            | Expr::Between { .. }
-            | Expr::InList { .. }
-            | Expr::IsNotNull(_) => Ok(DataType::Boolean),
-            Expr::BinaryExpr {
-                ref left,
-                ref right,
-                ref op,
-            } => binary_operator_data_type(
-                &left.get_type(schema)?,
-                op,
-                &right.get_type(schema)?,
-            ),
-            Expr::Wildcard => Err(DataFusionError::Internal(
-                "Wildcard expressions are not valid in a logical query plan".to_owned(),
-            )),
-            Expr::GetIndexedField { ref expr, key } => {
-                let data_type = expr.get_type(schema)?;
-
-                get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
-            }
-        }
-    }
-
-    /// Returns the nullability of the expression based on [ExprSchema].
-    ///
-    /// Note: [DFSchema] implements [ExprSchema].
-    ///
-    /// # Errors
-    ///
-    /// This function errors when it is not possible to compute its
-    /// nullability.  This happens when the expression refers to a
-    /// column that does not exist in the schema.
-    pub fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool> {
-        match self {
-            Expr::Alias(expr, _)
-            | Expr::Not(expr)
-            | Expr::Negative(expr)
-            | Expr::Sort { expr, .. }
-            | Expr::Between { expr, .. }
-            | Expr::InList { expr, .. } => expr.nullable(input_schema),
-            Expr::Column(c) => input_schema.nullable(c),
-            Expr::Literal(value) => Ok(value.is_null()),
-            Expr::Case {
-                when_then_expr,
-                else_expr,
-                ..
-            } => {
-                // this expression is nullable if any of the input expressions are nullable
-                let then_nullable = when_then_expr
-                    .iter()
-                    .map(|(_, t)| t.nullable(input_schema))
-                    .collect::<Result<Vec<_>>>()?;
-                if then_nullable.contains(&true) {
-                    Ok(true)
-                } else if let Some(e) = else_expr {
-                    e.nullable(input_schema)
-                } else {
-                    Ok(false)
-                }
-            }
-            Expr::Cast { expr, .. } => expr.nullable(input_schema),
-            Expr::ScalarVariable(_)
-            | Expr::TryCast { .. }
-            | Expr::ScalarFunction { .. }
-            | Expr::ScalarUDF { .. }
-            | Expr::WindowFunction { .. }
-            | Expr::AggregateFunction { .. }
-            | Expr::AggregateUDF { .. } => Ok(true),
-            Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false),
-            Expr::BinaryExpr {
-                ref left,
-                ref right,
-                ..
-            } => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
-            Expr::Wildcard => Err(DataFusionError::Internal(
-                "Wildcard expressions are not valid in a logical query plan".to_owned(),
-            )),
-            Expr::GetIndexedField { ref expr, key } => {
-                let data_type = expr.get_type(input_schema)?;
-                get_indexed_field(&data_type, key).map(|x| x.is_nullable())
-            }
-        }
-    }
-
     /// Returns the name of this expression based on [crate::logical_plan::DFSchema].
     ///
     /// This represents how a column with this expression is named when no alias is chosen
diff --git a/datafusion/src/logical_plan/expr_schema.rs b/datafusion/src/logical_plan/expr_schema.rs
new file mode 100644
index 0000000..5c128db
--- /dev/null
+++ b/datafusion/src/logical_plan/expr_schema.rs
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::Expr;
+use crate::field_util::get_indexed_field;
+use crate::physical_plan::{
+    aggregates, expressions::binary_operator_data_type, functions, window_functions,
+};
+use arrow::datatypes::DataType;
+use datafusion_common::{DataFusionError, ExprSchema, Result};
+
+/// trait to allow expr to typable with respect to a schema
+pub trait ExprSchemable {
+    /// given a schema, return the type of the expr
+    fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType>;
+
+    /// given a schema, return the nullability of the expr
+    fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool>;
+}
+
+impl ExprSchemable for Expr {
+    /// Returns the [arrow::datatypes::DataType] of the expression
+    /// based on [ExprSchema]
+    ///
+    /// Note: [DFSchema] implements [ExprSchema].
+    ///
+    /// # Errors
+    ///
+    /// This function errors when it is not possible to compute its
+    /// [arrow::datatypes::DataType].  This happens when e.g. the
+    /// expression refers to a column that does not exist in the
+    /// schema, or when the expression is incorrectly typed
+    /// (e.g. `[utf8] + [bool]`).
+    fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType> {
+        match self {
+            Expr::Alias(expr, _) | Expr::Sort { expr, .. } | Expr::Negative(expr) => {
+                expr.get_type(schema)
+            }
+            Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
+            Expr::ScalarVariable(_) => Ok(DataType::Utf8),
+            Expr::Literal(l) => Ok(l.get_datatype()),
+            Expr::Case { when_then_expr, .. } => when_then_expr[0].1.get_type(schema),
+            Expr::Cast { data_type, .. } | Expr::TryCast { data_type, .. } => {
+                Ok(data_type.clone())
+            }
+            Expr::ScalarUDF { fun, args } => {
+                let data_types = args
+                    .iter()
+                    .map(|e| e.get_type(schema))
+                    .collect::<Result<Vec<_>>>()?;
+                Ok((fun.return_type)(&data_types)?.as_ref().clone())
+            }
+            Expr::ScalarFunction { fun, args } => {
+                let data_types = args
+                    .iter()
+                    .map(|e| e.get_type(schema))
+                    .collect::<Result<Vec<_>>>()?;
+                functions::return_type(fun, &data_types)
+            }
+            Expr::WindowFunction { fun, args, .. } => {
+                let data_types = args
+                    .iter()
+                    .map(|e| e.get_type(schema))
+                    .collect::<Result<Vec<_>>>()?;
+                window_functions::return_type(fun, &data_types)
+            }
+            Expr::AggregateFunction { fun, args, .. } => {
+                let data_types = args
+                    .iter()
+                    .map(|e| e.get_type(schema))
+                    .collect::<Result<Vec<_>>>()?;
+                aggregates::return_type(fun, &data_types)
+            }
+            Expr::AggregateUDF { fun, args, .. } => {
+                let data_types = args
+                    .iter()
+                    .map(|e| e.get_type(schema))
+                    .collect::<Result<Vec<_>>>()?;
+                Ok((fun.return_type)(&data_types)?.as_ref().clone())
+            }
+            Expr::Not(_)
+            | Expr::IsNull(_)
+            | Expr::Between { .. }
+            | Expr::InList { .. }
+            | Expr::IsNotNull(_) => Ok(DataType::Boolean),
+            Expr::BinaryExpr {
+                ref left,
+                ref right,
+                ref op,
+            } => binary_operator_data_type(
+                &left.get_type(schema)?,
+                op,
+                &right.get_type(schema)?,
+            ),
+            Expr::Wildcard => Err(DataFusionError::Internal(
+                "Wildcard expressions are not valid in a logical query plan".to_owned(),
+            )),
+            Expr::GetIndexedField { ref expr, key } => {
+                let data_type = expr.get_type(schema)?;
+
+                get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
+            }
+        }
+    }
+
+    /// Returns the nullability of the expression based on [ExprSchema].
+    ///
+    /// Note: [DFSchema] implements [ExprSchema].
+    ///
+    /// # Errors
+    ///
+    /// This function errors when it is not possible to compute its
+    /// nullability.  This happens when the expression refers to a
+    /// column that does not exist in the schema.
+    fn nullable<S: ExprSchema>(&self, input_schema: &S) -> Result<bool> {
+        match self {
+            Expr::Alias(expr, _)
+            | Expr::Not(expr)
+            | Expr::Negative(expr)
+            | Expr::Sort { expr, .. }
+            | Expr::Between { expr, .. }
+            | Expr::InList { expr, .. } => expr.nullable(input_schema),
+            Expr::Column(c) => input_schema.nullable(c),
+            Expr::Literal(value) => Ok(value.is_null()),
+            Expr::Case {
+                when_then_expr,
+                else_expr,
+                ..
+            } => {
+                // this expression is nullable if any of the input expressions are nullable
+                let then_nullable = when_then_expr
+                    .iter()
+                    .map(|(_, t)| t.nullable(input_schema))
+                    .collect::<Result<Vec<_>>>()?;
+                if then_nullable.contains(&true) {
+                    Ok(true)
+                } else if let Some(e) = else_expr {
+                    e.nullable(input_schema)
+                } else {
+                    Ok(false)
+                }
+            }
+            Expr::Cast { expr, .. } => expr.nullable(input_schema),
+            Expr::ScalarVariable(_)
+            | Expr::TryCast { .. }
+            | Expr::ScalarFunction { .. }
+            | Expr::ScalarUDF { .. }
+            | Expr::WindowFunction { .. }
+            | Expr::AggregateFunction { .. }
+            | Expr::AggregateUDF { .. } => Ok(true),
+            Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false),
+            Expr::BinaryExpr {
+                ref left,
+                ref right,
+                ..
+            } => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
+            Expr::Wildcard => Err(DataFusionError::Internal(
+                "Wildcard expressions are not valid in a logical query plan".to_owned(),
+            )),
+            Expr::GetIndexedField { ref expr, key } => {
+                let data_type = expr.get_type(input_schema)?;
+                get_indexed_field(&data_type, key).map(|x| x.is_nullable())
+            }
+        }
+    }
+}
diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs
index 085775a..f2ecb0f 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -26,6 +26,7 @@ mod dfschema;
 mod display;
 mod expr;
 mod expr_rewriter;
+mod expr_schema;
 mod expr_simplier;
 mod expr_visitor;
 mod extension;
@@ -54,6 +55,7 @@ pub use expr_rewriter::{
     normalize_col, normalize_cols, replace_col, rewrite_sort_cols_by_aggs,
     unnormalize_col, unnormalize_cols, ExprRewritable, ExprRewriter, RewriteRecursion,
 };
+pub use expr_schema::ExprSchemable;
 pub use expr_simplier::{ExprSimplifiable, SimplifyInfo};
 pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
 pub use extension::UserDefinedLogicalNode;
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index 5c2219b..2ed45be 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -23,7 +23,7 @@ use crate::logical_plan::plan::{Filter, Projection, Window};
 use crate::logical_plan::{
     col,
     plan::{Aggregate, Sort},
-    DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprVisitable,
+    DFField, DFSchema, Expr, ExprRewritable, ExprRewriter, ExprSchemable, ExprVisitable,
     ExpressionVisitor, LogicalPlan, Recursion, RewriteRecursion,
 };
 use crate::optimizer::optimizer::OptimizerRule;
diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs
index f8f3df4..4e9709b 100644
--- a/datafusion/src/optimizer/simplify_expressions.rs
+++ b/datafusion/src/optimizer/simplify_expressions.rs
@@ -17,12 +17,9 @@
 
 //! Simplify expressions optimizer rule
 
-use arrow::array::new_null_array;
-use arrow::datatypes::{DataType, Field, Schema};
-use arrow::record_batch::RecordBatch;
-
 use crate::error::DataFusionError;
 use crate::execution::context::ExecutionProps;
+use crate::logical_plan::ExprSchemable;
 use crate::logical_plan::{
     lit, DFSchema, DFSchemaRef, Expr, ExprRewritable, ExprRewriter, ExprSimplifiable,
     LogicalPlan, RewriteRecursion, SimplifyInfo,
@@ -33,6 +30,9 @@ use crate::physical_plan::functions::Volatility;
 use crate::physical_plan::planner::create_physical_expr;
 use crate::scalar::ScalarValue;
 use crate::{error::Result, logical_plan::Operator};
+use arrow::array::new_null_array;
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
 
 /// Provides simplification information based on schema and properties
 struct SimplifyContext<'a, 'b> {
diff --git a/datafusion/tests/simplification.rs b/datafusion/tests/simplification.rs
index 0ce8e76..fe5f5e2 100644
--- a/datafusion/tests/simplification.rs
+++ b/datafusion/tests/simplification.rs
@@ -18,6 +18,7 @@
 //! This program demonstrates the DataFusion expression simplification API.
 
 use arrow::datatypes::{DataType, Field, Schema};
+use datafusion::logical_plan::ExprSchemable;
 use datafusion::logical_plan::ExprSimplifiable;
 use datafusion::{
     error::Result,

[arrow-datafusion] 03/05: fix imported types

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit b0e68008815ece43f2c41439c8c12fd0b70caf34
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Tue Feb 8 20:59:43 2022 +0800

    fix imported types
---
 datafusion-expr/src/lib.rs                 |  28 +++++++
 datafusion-expr/src/udaf.rs                | 115 +++++++++++++----------------
 datafusion-expr/src/udf.rs                 | 114 +++++++++++++---------------
 datafusion/src/physical_plan/aggregates.rs |   9 ---
 datafusion/src/physical_plan/functions.rs  |  15 ----
 datafusion/src/physical_plan/udaf.rs       |  67 +----------------
 6 files changed, 131 insertions(+), 217 deletions(-)

diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 0afc910..6c553d1 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -25,6 +25,34 @@ mod udf;
 mod window_frame;
 mod window_function;
 
+use arrow::datatypes::DataType;
+use datafusion_common::{ColumnarValue, Result};
+use std::sync::Arc;
+
+/// Scalar function
+///
+/// The Fn param is the wrapped function but be aware that the function will
+/// be passed with the slice / vec of columnar values (either scalar or array)
+/// with the exception of zero param function, where a singular element vec
+/// will be passed. In that case the single element is a null array to indicate
+/// the batch's row count (so that the generative zero-argument function can know
+/// the result array size).
+pub type ScalarFunctionImplementation =
+  Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
+
+/// A function's return type
+pub type ReturnTypeFunction =
+  Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
+
+/// the implementation of an aggregate function
+pub type AccumulatorFunctionImplementation =
+  Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync>;
+
+/// This signature corresponds to which types an aggregator serializes
+/// its state, given its return datatype.
+pub type StateTypeFunction =
+  Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
+
 pub use accumulator::Accumulator;
 pub use aggregate_function::AggregateFunction;
 pub use built_in_function::BuiltinScalarFunction;
diff --git a/datafusion-expr/src/udaf.rs b/datafusion-expr/src/udaf.rs
index 3fea4d9..142cfe1 100644
--- a/datafusion-expr/src/udaf.rs
+++ b/datafusion-expr/src/udaf.rs
@@ -17,91 +17,76 @@
 
 //! This module contains functions and structs supporting user-defined aggregate functions.
 
-use fmt::{Debug, Formatter};
-use std::any::Any;
-use std::fmt;
-
-use arrow::{
-    datatypes::Field,
-    datatypes::{DataType, Schema},
-};
-
-use crate::physical_plan::PhysicalExpr;
-use crate::{error::Result, logical_plan::Expr};
-
-use super::{
-    aggregates::AccumulatorFunctionImplementation,
-    aggregates::StateTypeFunction,
-    expressions::format_state_name,
-    functions::{ReturnTypeFunction, Signature},
-    type_coercion::coerce,
-    Accumulator, AggregateExpr,
+use crate::Expr;
+use crate::{
+  AccumulatorFunctionImplementation, ReturnTypeFunction, Signature, StateTypeFunction,
 };
+use std::fmt::{self, Debug, Formatter};
 use std::sync::Arc;
 
 /// Logical representation of a user-defined aggregate function (UDAF)
 /// A UDAF is different from a UDF in that it is stateful across batches.
 #[derive(Clone)]
 pub struct AggregateUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    pub accumulator: AccumulatorFunctionImplementation,
-    /// the accumulator's state's description as a function of the return type
-    pub state_type: StateTypeFunction,
+  /// name
+  pub name: String,
+  /// signature
+  pub signature: Signature,
+  /// Return type
+  pub return_type: ReturnTypeFunction,
+  /// actual implementation
+  pub accumulator: AccumulatorFunctionImplementation,
+  /// the accumulator's state's description as a function of the return type
+  pub state_type: StateTypeFunction,
 }
 
 impl Debug for AggregateUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("AggregateUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
+  fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+    f.debug_struct("AggregateUDF")
+      .field("name", &self.name)
+      .field("signature", &self.signature)
+      .field("fun", &"<FUNC>")
+      .finish()
+  }
 }
 
 impl PartialEq for AggregateUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
+  fn eq(&self, other: &Self) -> bool {
+    self.name == other.name && self.signature == other.signature
+  }
 }
 
 impl std::hash::Hash for AggregateUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
+  fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+    self.name.hash(state);
+    self.signature.hash(state);
+  }
 }
 
 impl AggregateUDF {
-    /// Create a new AggregateUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        accumulator: &AccumulatorFunctionImplementation,
-        state_type: &StateTypeFunction,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            accumulator: accumulator.clone(),
-            state_type: state_type.clone(),
-        }
+  /// Create a new AggregateUDF
+  pub fn new(
+    name: &str,
+    signature: &Signature,
+    return_type: &ReturnTypeFunction,
+    accumulator: &AccumulatorFunctionImplementation,
+    state_type: &StateTypeFunction,
+  ) -> Self {
+    Self {
+      name: name.to_owned(),
+      signature: signature.clone(),
+      return_type: return_type.clone(),
+      accumulator: accumulator.clone(),
+      state_type: state_type.clone(),
     }
+  }
 
-    /// creates a logical expression with a call of the UDAF
-    /// This utility allows using the UDAF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::AggregateUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
+  /// creates a logical expression with a call of the UDAF
+  /// This utility allows using the UDAF without requiring access to the registry.
+  pub fn call(&self, args: Vec<Expr>) -> Expr {
+    Expr::AggregateUDF {
+      fun: Arc::new(self.clone()),
+      args,
     }
+  }
 }
diff --git a/datafusion-expr/src/udf.rs b/datafusion-expr/src/udf.rs
index 9e7bebc..247d6a2 100644
--- a/datafusion-expr/src/udf.rs
+++ b/datafusion-expr/src/udf.rs
@@ -17,87 +17,77 @@
 
 //! UDF support
 
-use fmt::{Debug, Formatter};
+use crate::{Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature};
 use std::fmt;
-
-use arrow::datatypes::Schema;
-
-use crate::error::Result;
-use crate::{logical_plan::Expr, physical_plan::PhysicalExpr};
-
-use super::{
-    functions::{
-        ReturnTypeFunction, ScalarFunctionExpr, ScalarFunctionImplementation, Signature,
-    },
-    type_coercion::coerce,
-};
+use std::fmt::Debug;
+use std::fmt::Formatter;
 use std::sync::Arc;
 
 /// Logical representation of a UDF.
 #[derive(Clone)]
 pub struct ScalarUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    ///
-    /// The fn param is the wrapped function but be aware that the function will
-    /// be passed with the slice / vec of columnar values (either scalar or array)
-    /// with the exception of zero param function, where a singular element vec
-    /// will be passed. In that case the single element is a null array to indicate
-    /// the batch's row count (so that the generative zero-argument function can know
-    /// the result array size).
-    pub fun: ScalarFunctionImplementation,
+  /// name
+  pub name: String,
+  /// signature
+  pub signature: Signature,
+  /// Return type
+  pub return_type: ReturnTypeFunction,
+  /// actual implementation
+  ///
+  /// The fn param is the wrapped function but be aware that the function will
+  /// be passed with the slice / vec of columnar values (either scalar or array)
+  /// with the exception of zero param function, where a singular element vec
+  /// will be passed. In that case the single element is a null array to indicate
+  /// the batch's row count (so that the generative zero-argument function can know
+  /// the result array size).
+  pub fun: ScalarFunctionImplementation,
 }
 
 impl Debug for ScalarUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("ScalarUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
+  fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+    f.debug_struct("ScalarUDF")
+      .field("name", &self.name)
+      .field("signature", &self.signature)
+      .field("fun", &"<FUNC>")
+      .finish()
+  }
 }
 
 impl PartialEq for ScalarUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
+  fn eq(&self, other: &Self) -> bool {
+    self.name == other.name && self.signature == other.signature
+  }
 }
 
 impl std::hash::Hash for ScalarUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
+  fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+    self.name.hash(state);
+    self.signature.hash(state);
+  }
 }
 
 impl ScalarUDF {
-    /// Create a new ScalarUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        fun: &ScalarFunctionImplementation,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            fun: fun.clone(),
-        }
+  /// Create a new ScalarUDF
+  pub fn new(
+    name: &str,
+    signature: &Signature,
+    return_type: &ReturnTypeFunction,
+    fun: &ScalarFunctionImplementation,
+  ) -> Self {
+    Self {
+      name: name.to_owned(),
+      signature: signature.clone(),
+      return_type: return_type.clone(),
+      fun: fun.clone(),
     }
+  }
 
-    /// creates a logical expression with a call of the UDF
-    /// This utility allows using the UDF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::ScalarUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
+  /// creates a logical expression with a call of the UDF
+  /// This utility allows using the UDF without requiring access to the registry.
+  pub fn call(&self, args: Vec<Expr>) -> Expr {
+    Expr::ScalarUDF {
+      fun: Arc::new(self.clone()),
+      args,
     }
+  }
 }
diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs
index a1531d4..656b9c8 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -40,15 +40,6 @@ use expressions::{
 };
 use std::sync::Arc;
 
-/// the implementation of an aggregate function
-pub type AccumulatorFunctionImplementation =
-    Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync>;
-
-/// This signature corresponds to which types an aggregator serializes
-/// its state, given its return datatype.
-pub type StateTypeFunction =
-    Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
-
 pub use datafusion_expr::AggregateFunction;
 
 /// Returns the datatype of the aggregate function.
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 56e8f17..586b2ca 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -59,21 +59,6 @@ use std::{any::Any, fmt, sync::Arc};
 
 pub use datafusion_expr::{BuiltinScalarFunction, Signature, TypeSignature, Volatility};
 
-/// Scalar function
-///
-/// The Fn param is the wrapped function but be aware that the function will
-/// be passed with the slice / vec of columnar values (either scalar or array)
-/// with the exception of zero param function, where a singular element vec
-/// will be passed. In that case the single element is a null array to indicate
-/// the batch's row count (so that the generative zero-argument function can know
-/// the result array size).
-pub type ScalarFunctionImplementation =
-    Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
-
-/// A function's return type
-pub type ReturnTypeFunction =
-    Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
-
 macro_rules! make_utf8_to_return_type {
     ($FUNC:ident, $largeUtf8Type:expr, $utf8Type:expr) => {
         fn $FUNC(arg_type: &DataType, name: &str) -> Result<DataType> {
diff --git a/datafusion/src/physical_plan/udaf.rs b/datafusion/src/physical_plan/udaf.rs
index 0de696d..0e61cb4 100644
--- a/datafusion/src/physical_plan/udaf.rs
+++ b/datafusion/src/physical_plan/udaf.rs
@@ -39,72 +39,7 @@ use super::{
 };
 use std::sync::Arc;
 
-/// Logical representation of a user-defined aggregate function (UDAF)
-/// A UDAF is different from a UDF in that it is stateful across batches.
-#[derive(Clone)]
-pub struct AggregateUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
-    /// actual implementation
-    pub accumulator: AccumulatorFunctionImplementation,
-    /// the accumulator's state's description as a function of the return type
-    pub state_type: StateTypeFunction,
-}
-
-impl Debug for AggregateUDF {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        f.debug_struct("AggregateUDF")
-            .field("name", &self.name)
-            .field("signature", &self.signature)
-            .field("fun", &"<FUNC>")
-            .finish()
-    }
-}
-
-impl PartialEq for AggregateUDF {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name && self.signature == other.signature
-    }
-}
-
-impl std::hash::Hash for AggregateUDF {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-        self.signature.hash(state);
-    }
-}
-
-impl AggregateUDF {
-    /// Create a new AggregateUDF
-    pub fn new(
-        name: &str,
-        signature: &Signature,
-        return_type: &ReturnTypeFunction,
-        accumulator: &AccumulatorFunctionImplementation,
-        state_type: &StateTypeFunction,
-    ) -> Self {
-        Self {
-            name: name.to_owned(),
-            signature: signature.clone(),
-            return_type: return_type.clone(),
-            accumulator: accumulator.clone(),
-            state_type: state_type.clone(),
-        }
-    }
-
-    /// creates a logical expression with a call of the UDAF
-    /// This utility allows using the UDAF without requiring access to the registry.
-    pub fn call(&self, args: Vec<Expr>) -> Expr {
-        Expr::AggregateUDF {
-            fun: Arc::new(self.clone()),
-            args,
-        }
-    }
-}
+pub use datafusion_expr::AggregateUDF;
 
 /// Creates a physical expression of the UDAF, that includes all necessary type coercion.
 /// This function errors when `args`' can't be coerced to a valid argument type of the UDAF.

[arrow-datafusion] 01/05: pyarrow

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 6c312cc6d17fb0663f850b8ced2b50b6b49a1875
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:29:20 2022 +0800

    pyarrow
---
 datafusion-common/src/pyarrow.rs          |  6 ----
 datafusion-expr/src/accumulator.rs        | 44 +++++++++++++++++++++++
 datafusion-expr/src/columnar_value.rs     | 60 +++++++++++++++++++++++++++++++
 datafusion-expr/src/lib.rs                |  2 ++
 datafusion/src/physical_plan/functions.rs | 14 ++------
 datafusion/src/physical_plan/mod.rs       | 52 ++-------------------------
 6 files changed, 110 insertions(+), 68 deletions(-)

diff --git a/datafusion-common/src/pyarrow.rs b/datafusion-common/src/pyarrow.rs
index bf10b45..1ee88d7 100644
--- a/datafusion-common/src/pyarrow.rs
+++ b/datafusion-common/src/pyarrow.rs
@@ -25,12 +25,6 @@ use pyo3::prelude::PyErr;
 use pyo3::types::PyList;
 use pyo3::{FromPyObject, IntoPy, PyAny, PyObject, PyResult, Python};
 
-impl From<DataFusionError> for PyErr {
-    fn from(err: DataFusionError) -> PyErr {
-        PyException::new_err(err.to_string())
-    }
-}
-
 impl PyArrowConvert for ScalarValue {
     fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
         let py = value.py();
diff --git a/datafusion-expr/src/accumulator.rs b/datafusion-expr/src/accumulator.rs
new file mode 100644
index 0000000..599bd36
--- /dev/null
+++ b/datafusion-expr/src/accumulator.rs
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::ArrayRef;
+use datafusion_common::{Result, ScalarValue};
+use std::fmt::Debug;
+
+/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
+/// generically accumulates values.
+///
+/// An accumulator knows how to:
+/// * update its state from inputs via `update_batch`
+/// * convert its internal state to a vector of scalar values
+/// * update its state from multiple accumulators' states via `merge_batch`
+/// * compute the final value from its internal state via `evaluate`
+pub trait Accumulator: Send + Sync + Debug {
+    /// Returns the state of the accumulator at the end of the accumulation.
+    // in the case of an average on which we track `sum` and `n`, this function should return a vector
+    // of two values, sum and n.
+    fn state(&self) -> Result<Vec<ScalarValue>>;
+
+    /// updates the accumulator's state from a vector of arrays.
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
+
+    /// updates the accumulator's state from a vector of states.
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
+
+    /// returns its value based on its current state.
+    fn evaluate(&self) -> Result<ScalarValue>;
+}
diff --git a/datafusion-expr/src/columnar_value.rs b/datafusion-expr/src/columnar_value.rs
new file mode 100644
index 0000000..5e6959d
--- /dev/null
+++ b/datafusion-expr/src/columnar_value.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::ArrayRef;
+use arrow::array::NullArray;
+use arrow::datatypes::DataType;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::ScalarValue;
+use std::sync::Arc;
+
+/// Represents the result from an expression
+#[derive(Clone)]
+pub enum ColumnarValue {
+    /// Array of values
+    Array(ArrayRef),
+    /// A single value
+    Scalar(ScalarValue),
+}
+
+impl ColumnarValue {
+    pub fn data_type(&self) -> DataType {
+        match self {
+            ColumnarValue::Array(array_value) => array_value.data_type().clone(),
+            ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
+        }
+    }
+
+    /// Convert a columnar value into an ArrayRef
+    pub fn into_array(self, num_rows: usize) -> ArrayRef {
+        match self {
+            ColumnarValue::Array(array) => array,
+            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
+        }
+    }
+}
+
+/// null columnar values are implemented as a null array in order to pass batch
+/// num_rows
+pub type NullColumnarValue = ColumnarValue;
+
+impl From<&RecordBatch> for NullColumnarValue {
+    fn from(batch: &RecordBatch) -> Self {
+        let num_rows = batch.num_rows();
+        ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
+    }
+}
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 7dcddc3..eb86b0b 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod accumulator;
 mod aggregate_function;
 mod built_in_function;
 mod operator;
@@ -22,6 +23,7 @@ mod signature;
 mod window_frame;
 mod window_function;
 
+pub use accumulator::Accumulator;
 pub use aggregate_function::AggregateFunction;
 pub use built_in_function::BuiltinScalarFunction;
 pub use operator::Operator;
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 9582eec..56e8f17 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -46,12 +46,13 @@ use crate::{
     scalar::ScalarValue,
 };
 use arrow::{
-    array::{ArrayRef, NullArray},
+    array::ArrayRef,
     compute::kernels::length::{bit_length, length},
     datatypes::TimeUnit,
     datatypes::{DataType, Field, Int32Type, Int64Type, Schema},
     record_batch::RecordBatch,
 };
+pub use datafusion_expr::NullColumnarValue;
 use fmt::{Debug, Formatter};
 use std::convert::From;
 use std::{any::Any, fmt, sync::Arc};
@@ -1206,17 +1207,6 @@ impl fmt::Display for ScalarFunctionExpr {
     }
 }
 
-/// null columnar values are implemented as a null array in order to pass batch
-/// num_rows
-type NullColumnarValue = ColumnarValue;
-
-impl From<&RecordBatch> for NullColumnarValue {
-    fn from(batch: &RecordBatch) -> Self {
-        let num_rows = batch.num_rows();
-        ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
-    }
-}
-
 impl PhysicalExpr for ScalarFunctionExpr {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index ac70f2f..38a19db 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -35,6 +35,8 @@ use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 use arrow::{array::ArrayRef, datatypes::Field};
 use async_trait::async_trait;
+pub use datafusion_expr::Accumulator;
+pub use datafusion_expr::ColumnarValue;
 pub use display::DisplayFormatType;
 use futures::stream::Stream;
 use std::fmt;
@@ -419,32 +421,6 @@ pub enum Distribution {
     HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
 }
 
-/// Represents the result from an expression
-#[derive(Clone)]
-pub enum ColumnarValue {
-    /// Array of values
-    Array(ArrayRef),
-    /// A single value
-    Scalar(ScalarValue),
-}
-
-impl ColumnarValue {
-    fn data_type(&self) -> DataType {
-        match self {
-            ColumnarValue::Array(array_value) => array_value.data_type().clone(),
-            ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
-        }
-    }
-
-    /// Convert a columnar value into an ArrayRef
-    pub fn into_array(self, num_rows: usize) -> ArrayRef {
-        match self {
-            ColumnarValue::Array(array) => array,
-            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
-        }
-    }
-}
-
 /// Expression that can be evaluated against a RecordBatch
 /// A Physical expression knows its type, nullability and how to evaluate itself.
 pub trait PhysicalExpr: Send + Sync + Display + Debug {
@@ -578,30 +554,6 @@ pub trait WindowExpr: Send + Sync + Debug {
     }
 }
 
-/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
-/// generically accumulates values.
-///
-/// An accumulator knows how to:
-/// * update its state from inputs via `update_batch`
-/// * convert its internal state to a vector of scalar values
-/// * update its state from multiple accumulators' states via `merge_batch`
-/// * compute the final value from its internal state via `evaluate`
-pub trait Accumulator: Send + Sync + Debug {
-    /// Returns the state of the accumulator at the end of the accumulation.
-    // in the case of an average on which we track `sum` and `n`, this function should return a vector
-    // of two values, sum and n.
-    fn state(&self) -> Result<Vec<ScalarValue>>;
-
-    /// updates the accumulator's state from a vector of arrays.
-    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
-
-    /// updates the accumulator's state from a vector of states.
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
-
-    /// returns its value based on its current state.
-    fn evaluate(&self) -> Result<ScalarValue>;
-}
-
 /// Applies an optional projection to a [`SchemaRef`], returning the
 /// projected schema
 ///