You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/08 15:12:33 UTC
[arrow-datafusion] 01/01: pyarrow
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit 4890c5a5f8a99cc67c6c6c244090102a70d91c39
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:29:20 2022 +0800
pyarrow
---
datafusion-expr/src/accumulator.rs | 44 ++++++++++++++
datafusion-expr/src/columnar_value.rs | 60 +++++++++++++++++++
datafusion-expr/src/{lib.rs => expr.rs} | 14 -----
datafusion-expr/src/lib.rs | 36 ++++++++++++
datafusion-expr/src/udaf.rs | 92 +++++++++++++++++++++++++++++
datafusion-expr/src/udf.rs | 93 ++++++++++++++++++++++++++++++
datafusion/src/physical_plan/aggregates.rs | 9 ---
datafusion/src/physical_plan/functions.rs | 29 +---------
datafusion/src/physical_plan/mod.rs | 52 +----------------
datafusion/src/physical_plan/udaf.rs | 67 +--------------------
datafusion/src/physical_plan/udf.rs | 69 +---------------------
11 files changed, 331 insertions(+), 234 deletions(-)
diff --git a/datafusion-expr/src/accumulator.rs b/datafusion-expr/src/accumulator.rs
new file mode 100644
index 0000000..599bd36
--- /dev/null
+++ b/datafusion-expr/src/accumulator.rs
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::ArrayRef;
+use datafusion_common::{Result, ScalarValue};
+use std::fmt::Debug;
+
+/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
+/// generically accumulates values.
+///
+/// An accumulator knows how to:
+/// * update its state from inputs via `update_batch`
+/// * convert its internal state to a vector of scalar values
+/// * update its state from multiple accumulators' states via `merge_batch`
+/// * compute the final value from its internal state via `evaluate`
+pub trait Accumulator: Send + Sync + Debug {
+ /// Returns the state of the accumulator at the end of the accumulation.
+ // in the case of an average on which we track `sum` and `n`, this function should return a vector
+ // of two values, sum and n.
+ fn state(&self) -> Result<Vec<ScalarValue>>;
+
+ /// updates the accumulator's state from a vector of arrays.
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
+
+ /// updates the accumulator's state from a vector of states.
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
+
+ /// returns its value based on its current state.
+ fn evaluate(&self) -> Result<ScalarValue>;
+}
diff --git a/datafusion-expr/src/columnar_value.rs b/datafusion-expr/src/columnar_value.rs
new file mode 100644
index 0000000..5e6959d
--- /dev/null
+++ b/datafusion-expr/src/columnar_value.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::ArrayRef;
+use arrow::array::NullArray;
+use arrow::datatypes::DataType;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::ScalarValue;
+use std::sync::Arc;
+
+/// Represents the result from an expression
+#[derive(Clone)]
+pub enum ColumnarValue {
+ /// Array of values
+ Array(ArrayRef),
+ /// A single value
+ Scalar(ScalarValue),
+}
+
+impl ColumnarValue {
+ pub fn data_type(&self) -> DataType {
+ match self {
+ ColumnarValue::Array(array_value) => array_value.data_type().clone(),
+ ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
+ }
+ }
+
+ /// Convert a columnar value into an ArrayRef
+ pub fn into_array(self, num_rows: usize) -> ArrayRef {
+ match self {
+ ColumnarValue::Array(array) => array,
+ ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
+ }
+ }
+}
+
+/// null columnar values are implemented as a null array in order to pass batch
+/// num_rows
+pub type NullColumnarValue = ColumnarValue;
+
+impl From<&RecordBatch> for NullColumnarValue {
+ fn from(batch: &RecordBatch) -> Self {
+ let num_rows = batch.num_rows();
+ ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
+ }
+}
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/expr.rs
similarity index 64%
copy from datafusion-expr/src/lib.rs
copy to datafusion-expr/src/expr.rs
index 7dcddc3..b248758 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/expr.rs
@@ -14,17 +14,3 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
-mod aggregate_function;
-mod built_in_function;
-mod operator;
-mod signature;
-mod window_frame;
-mod window_function;
-
-pub use aggregate_function::AggregateFunction;
-pub use built_in_function::BuiltinScalarFunction;
-pub use operator::Operator;
-pub use signature::{Signature, TypeSignature, Volatility};
-pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
-pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 7dcddc3..2dc9c17 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -15,16 +15,52 @@
// specific language governing permissions and limitations
// under the License.
+mod accumulator;
mod aggregate_function;
mod built_in_function;
+mod columnar_value;
mod operator;
mod signature;
+mod udaf;
+mod udf;
mod window_frame;
mod window_function;
+use arrow::datatypes::DataType;
+use datafusion_common::Result;
+use std::sync::Arc;
+
+/// Scalar function
+///
+/// The Fn param is the wrapped function but be aware that the function will
+/// be passed with the slice / vec of columnar values (either scalar or array)
+/// with the exception of zero param function, where a singular element vec
+/// will be passed. In that case the single element is a null array to indicate
+/// the batch's row count (so that the generative zero-argument function can know
+/// the result array size).
+pub type ScalarFunctionImplementation =
+ Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
+
+/// A function's return type
+pub type ReturnTypeFunction =
+ Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
+
+/// the implementation of an aggregate function
+pub type AccumulatorFunctionImplementation =
+ Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync>;
+
+/// This signature corresponds to which types an aggregator serializes
+/// its state, given its return datatype.
+pub type StateTypeFunction =
+ Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
+
+pub use accumulator::Accumulator;
pub use aggregate_function::AggregateFunction;
pub use built_in_function::BuiltinScalarFunction;
+pub use columnar_value::ColumnarValue;
pub use operator::Operator;
pub use signature::{Signature, TypeSignature, Volatility};
+pub use udaf::AggregateUDF;
+pub use udf::ScalarUDF;
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion-expr/src/udaf.rs b/datafusion-expr/src/udaf.rs
new file mode 100644
index 0000000..142cfe1
--- /dev/null
+++ b/datafusion-expr/src/udaf.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains functions and structs supporting user-defined aggregate functions.
+
+use crate::Expr;
+use crate::{
+ AccumulatorFunctionImplementation, ReturnTypeFunction, Signature, StateTypeFunction,
+};
+use std::fmt::{self, Debug, Formatter};
+use std::sync::Arc;
+
+/// Logical representation of a user-defined aggregate function (UDAF)
+/// A UDAF is different from a UDF in that it is stateful across batches.
+#[derive(Clone)]
+pub struct AggregateUDF {
+ /// name
+ pub name: String,
+ /// signature
+ pub signature: Signature,
+ /// Return type
+ pub return_type: ReturnTypeFunction,
+ /// actual implementation
+ pub accumulator: AccumulatorFunctionImplementation,
+ /// the accumulator's state's description as a function of the return type
+ pub state_type: StateTypeFunction,
+}
+
+impl Debug for AggregateUDF {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ f.debug_struct("AggregateUDF")
+ .field("name", &self.name)
+ .field("signature", &self.signature)
+ .field("fun", &"<FUNC>")
+ .finish()
+ }
+}
+
+impl PartialEq for AggregateUDF {
+ fn eq(&self, other: &Self) -> bool {
+ self.name == other.name && self.signature == other.signature
+ }
+}
+
+impl std::hash::Hash for AggregateUDF {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.name.hash(state);
+ self.signature.hash(state);
+ }
+}
+
+impl AggregateUDF {
+ /// Create a new AggregateUDF
+ pub fn new(
+ name: &str,
+ signature: &Signature,
+ return_type: &ReturnTypeFunction,
+ accumulator: &AccumulatorFunctionImplementation,
+ state_type: &StateTypeFunction,
+ ) -> Self {
+ Self {
+ name: name.to_owned(),
+ signature: signature.clone(),
+ return_type: return_type.clone(),
+ accumulator: accumulator.clone(),
+ state_type: state_type.clone(),
+ }
+ }
+
+ /// creates a logical expression with a call of the UDAF
+ /// This utility allows using the UDAF without requiring access to the registry.
+ pub fn call(&self, args: Vec<Expr>) -> Expr {
+ Expr::AggregateUDF {
+ fun: Arc::new(self.clone()),
+ args,
+ }
+ }
+}
diff --git a/datafusion-expr/src/udf.rs b/datafusion-expr/src/udf.rs
new file mode 100644
index 0000000..247d6a2
--- /dev/null
+++ b/datafusion-expr/src/udf.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! UDF support
+
+use crate::{Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature};
+use std::fmt;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+/// Logical representation of a UDF.
+#[derive(Clone)]
+pub struct ScalarUDF {
+ /// name
+ pub name: String,
+ /// signature
+ pub signature: Signature,
+ /// Return type
+ pub return_type: ReturnTypeFunction,
+ /// actual implementation
+ ///
+ /// The fn param is the wrapped function but be aware that the function will
+ /// be passed with the slice / vec of columnar values (either scalar or array)
+ /// with the exception of zero param function, where a singular element vec
+ /// will be passed. In that case the single element is a null array to indicate
+ /// the batch's row count (so that the generative zero-argument function can know
+ /// the result array size).
+ pub fun: ScalarFunctionImplementation,
+}
+
+impl Debug for ScalarUDF {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ f.debug_struct("ScalarUDF")
+ .field("name", &self.name)
+ .field("signature", &self.signature)
+ .field("fun", &"<FUNC>")
+ .finish()
+ }
+}
+
+impl PartialEq for ScalarUDF {
+ fn eq(&self, other: &Self) -> bool {
+ self.name == other.name && self.signature == other.signature
+ }
+}
+
+impl std::hash::Hash for ScalarUDF {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.name.hash(state);
+ self.signature.hash(state);
+ }
+}
+
+impl ScalarUDF {
+ /// Create a new ScalarUDF
+ pub fn new(
+ name: &str,
+ signature: &Signature,
+ return_type: &ReturnTypeFunction,
+ fun: &ScalarFunctionImplementation,
+ ) -> Self {
+ Self {
+ name: name.to_owned(),
+ signature: signature.clone(),
+ return_type: return_type.clone(),
+ fun: fun.clone(),
+ }
+ }
+
+ /// creates a logical expression with a call of the UDF
+ /// This utility allows using the UDF without requiring access to the registry.
+ pub fn call(&self, args: Vec<Expr>) -> Expr {
+ Expr::ScalarUDF {
+ fun: Arc::new(self.clone()),
+ args,
+ }
+ }
+}
diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs
index a1531d4..656b9c8 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -40,15 +40,6 @@ use expressions::{
};
use std::sync::Arc;
-/// the implementation of an aggregate function
-pub type AccumulatorFunctionImplementation =
- Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync>;
-
-/// This signature corresponds to which types an aggregator serializes
-/// its state, given its return datatype.
-pub type StateTypeFunction =
- Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
-
pub use datafusion_expr::AggregateFunction;
/// Returns the datatype of the aggregate function.
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 9582eec..586b2ca 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -46,33 +46,19 @@ use crate::{
scalar::ScalarValue,
};
use arrow::{
- array::{ArrayRef, NullArray},
+ array::ArrayRef,
compute::kernels::length::{bit_length, length},
datatypes::TimeUnit,
datatypes::{DataType, Field, Int32Type, Int64Type, Schema},
record_batch::RecordBatch,
};
+pub use datafusion_expr::NullColumnarValue;
use fmt::{Debug, Formatter};
use std::convert::From;
use std::{any::Any, fmt, sync::Arc};
pub use datafusion_expr::{BuiltinScalarFunction, Signature, TypeSignature, Volatility};
-/// Scalar function
-///
-/// The Fn param is the wrapped function but be aware that the function will
-/// be passed with the slice / vec of columnar values (either scalar or array)
-/// with the exception of zero param function, where a singular element vec
-/// will be passed. In that case the single element is a null array to indicate
-/// the batch's row count (so that the generative zero-argument function can know
-/// the result array size).
-pub type ScalarFunctionImplementation =
- Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
-
-/// A function's return type
-pub type ReturnTypeFunction =
- Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
-
macro_rules! make_utf8_to_return_type {
($FUNC:ident, $largeUtf8Type:expr, $utf8Type:expr) => {
fn $FUNC(arg_type: &DataType, name: &str) -> Result<DataType> {
@@ -1206,17 +1192,6 @@ impl fmt::Display for ScalarFunctionExpr {
}
}
-/// null columnar values are implemented as a null array in order to pass batch
-/// num_rows
-type NullColumnarValue = ColumnarValue;
-
-impl From<&RecordBatch> for NullColumnarValue {
- fn from(batch: &RecordBatch) -> Self {
- let num_rows = batch.num_rows();
- ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
- }
-}
-
impl PhysicalExpr for ScalarFunctionExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index ac70f2f..38a19db 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -35,6 +35,8 @@ use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
use async_trait::async_trait;
+pub use datafusion_expr::Accumulator;
+pub use datafusion_expr::ColumnarValue;
pub use display::DisplayFormatType;
use futures::stream::Stream;
use std::fmt;
@@ -419,32 +421,6 @@ pub enum Distribution {
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}
-/// Represents the result from an expression
-#[derive(Clone)]
-pub enum ColumnarValue {
- /// Array of values
- Array(ArrayRef),
- /// A single value
- Scalar(ScalarValue),
-}
-
-impl ColumnarValue {
- fn data_type(&self) -> DataType {
- match self {
- ColumnarValue::Array(array_value) => array_value.data_type().clone(),
- ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
- }
- }
-
- /// Convert a columnar value into an ArrayRef
- pub fn into_array(self, num_rows: usize) -> ArrayRef {
- match self {
- ColumnarValue::Array(array) => array,
- ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
- }
- }
-}
-
/// Expression that can be evaluated against a RecordBatch
/// A Physical expression knows its type, nullability and how to evaluate itself.
pub trait PhysicalExpr: Send + Sync + Display + Debug {
@@ -578,30 +554,6 @@ pub trait WindowExpr: Send + Sync + Debug {
}
}
-/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
-/// generically accumulates values.
-///
-/// An accumulator knows how to:
-/// * update its state from inputs via `update_batch`
-/// * convert its internal state to a vector of scalar values
-/// * update its state from multiple accumulators' states via `merge_batch`
-/// * compute the final value from its internal state via `evaluate`
-pub trait Accumulator: Send + Sync + Debug {
- /// Returns the state of the accumulator at the end of the accumulation.
- // in the case of an average on which we track `sum` and `n`, this function should return a vector
- // of two values, sum and n.
- fn state(&self) -> Result<Vec<ScalarValue>>;
-
- /// updates the accumulator's state from a vector of arrays.
- fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
-
- /// updates the accumulator's state from a vector of states.
- fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
-
- /// returns its value based on its current state.
- fn evaluate(&self) -> Result<ScalarValue>;
-}
-
/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema
///
diff --git a/datafusion/src/physical_plan/udaf.rs b/datafusion/src/physical_plan/udaf.rs
index 0de696d..0e61cb4 100644
--- a/datafusion/src/physical_plan/udaf.rs
+++ b/datafusion/src/physical_plan/udaf.rs
@@ -39,72 +39,7 @@ use super::{
};
use std::sync::Arc;
-/// Logical representation of a user-defined aggregate function (UDAF)
-/// A UDAF is different from a UDF in that it is stateful across batches.
-#[derive(Clone)]
-pub struct AggregateUDF {
- /// name
- pub name: String,
- /// signature
- pub signature: Signature,
- /// Return type
- pub return_type: ReturnTypeFunction,
- /// actual implementation
- pub accumulator: AccumulatorFunctionImplementation,
- /// the accumulator's state's description as a function of the return type
- pub state_type: StateTypeFunction,
-}
-
-impl Debug for AggregateUDF {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- f.debug_struct("AggregateUDF")
- .field("name", &self.name)
- .field("signature", &self.signature)
- .field("fun", &"<FUNC>")
- .finish()
- }
-}
-
-impl PartialEq for AggregateUDF {
- fn eq(&self, other: &Self) -> bool {
- self.name == other.name && self.signature == other.signature
- }
-}
-
-impl std::hash::Hash for AggregateUDF {
- fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- self.name.hash(state);
- self.signature.hash(state);
- }
-}
-
-impl AggregateUDF {
- /// Create a new AggregateUDF
- pub fn new(
- name: &str,
- signature: &Signature,
- return_type: &ReturnTypeFunction,
- accumulator: &AccumulatorFunctionImplementation,
- state_type: &StateTypeFunction,
- ) -> Self {
- Self {
- name: name.to_owned(),
- signature: signature.clone(),
- return_type: return_type.clone(),
- accumulator: accumulator.clone(),
- state_type: state_type.clone(),
- }
- }
-
- /// creates a logical expression with a call of the UDAF
- /// This utility allows using the UDAF without requiring access to the registry.
- pub fn call(&self, args: Vec<Expr>) -> Expr {
- Expr::AggregateUDF {
- fun: Arc::new(self.clone()),
- args,
- }
- }
-}
+pub use datafusion_expr::AggregateUDF;
/// Creates a physical expression of the UDAF, that includes all necessary type coercion.
/// This function errors when `args`' can't be coerced to a valid argument type of the UDAF.
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion/src/physical_plan/udf.rs
index 7355746..85e6b02 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion/src/physical_plan/udf.rs
@@ -33,74 +33,7 @@ use super::{
};
use std::sync::Arc;
-/// Logical representation of a UDF.
-#[derive(Clone)]
-pub struct ScalarUDF {
- /// name
- pub name: String,
- /// signature
- pub signature: Signature,
- /// Return type
- pub return_type: ReturnTypeFunction,
- /// actual implementation
- ///
- /// The fn param is the wrapped function but be aware that the function will
- /// be passed with the slice / vec of columnar values (either scalar or array)
- /// with the exception of zero param function, where a singular element vec
- /// will be passed. In that case the single element is a null array to indicate
- /// the batch's row count (so that the generative zero-argument function can know
- /// the result array size).
- pub fun: ScalarFunctionImplementation,
-}
-
-impl Debug for ScalarUDF {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- f.debug_struct("ScalarUDF")
- .field("name", &self.name)
- .field("signature", &self.signature)
- .field("fun", &"<FUNC>")
- .finish()
- }
-}
-
-impl PartialEq for ScalarUDF {
- fn eq(&self, other: &Self) -> bool {
- self.name == other.name && self.signature == other.signature
- }
-}
-
-impl std::hash::Hash for ScalarUDF {
- fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- self.name.hash(state);
- self.signature.hash(state);
- }
-}
-
-impl ScalarUDF {
- /// Create a new ScalarUDF
- pub fn new(
- name: &str,
- signature: &Signature,
- return_type: &ReturnTypeFunction,
- fun: &ScalarFunctionImplementation,
- ) -> Self {
- Self {
- name: name.to_owned(),
- signature: signature.clone(),
- return_type: return_type.clone(),
- fun: fun.clone(),
- }
- }
-
- /// creates a logical expression with a call of the UDF
- /// This utility allows using the UDF without requiring access to the registry.
- pub fn call(&self, args: Vec<Expr>) -> Expr {
- Expr::ScalarUDF {
- fun: Arc::new(self.clone()),
- args,
- }
- }
-}
+pub use datafusion_expr::ScalarUDF;
/// Create a physical expression of the UDF.
/// This function errors when `args`' can't be coerced to a valid argument type of the UDF.