You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/05/16 10:03:02 UTC
[arrow-datafusion] branch master updated: Use NullArray to Pass row
count to ScalarFunctions that take 0 arguments (#328)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 1c50371 Use NullArray to Pass row count to ScalarFunctions that take 0 arguments (#328)
1c50371 is described below
commit 1c5037138305f5b6204a1dbbfb17f9521f6995d3
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Sun May 16 18:02:53 2021 +0800
Use NullArray to Pass row count to ScalarFunctions that take 0 arguments (#328)
* add docs and comments
* use supports_zero_argument
---
datafusion/src/physical_plan/functions.rs | 49 ++++++++++++++++++++++++++-----
datafusion/src/physical_plan/udf.rs | 7 +++++
2 files changed, 48 insertions(+), 8 deletions(-)
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 2e053a8..c0c915f 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -44,13 +44,14 @@ use crate::{
scalar::ScalarValue,
};
use arrow::{
- array::ArrayRef,
+ array::{ArrayRef, NullArray},
compute::kernels::length::{bit_length, length},
datatypes::TimeUnit,
datatypes::{DataType, Field, Int32Type, Int64Type, Schema},
record_batch::RecordBatch,
};
use fmt::{Debug, Formatter};
+use std::convert::From;
use std::{any::Any, fmt, str::FromStr, sync::Arc};
/// A function's signature, which defines the function's supported argument types.
@@ -76,6 +77,13 @@ pub enum Signature {
}
/// Scalar function
+///
+/// The Fn param is the wrapped function but be aware that the function will
+/// be passed with the slice / vec of columnar values (either scalar or array)
+/// with the exception of zero param function, where a singular element vec
+/// will be passed. In that case the single element is a null array to indicate
+/// the batch's row count (so that the generative zero-argument function can know
+/// the result array size).
pub type ScalarFunctionImplementation =
Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
@@ -207,6 +215,14 @@ pub enum BuiltinScalarFunction {
RegexpMatch,
}
+impl BuiltinScalarFunction {
+ /// an allowlist of functions to take zero arguments, so that they will get special treatment
+ /// while executing.
+ fn supports_zero_argument(&self) -> bool {
+ matches!(self, BuiltinScalarFunction::Now)
+ }
+}
+
impl fmt::Display for BuiltinScalarFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// lowercase of the debug.
@@ -1371,6 +1387,17 @@ impl fmt::Display for ScalarFunctionExpr {
}
}
+/// null columnar values are implemented as a null array in order to pass batch
+/// num_rows
+type NullColumnarValue = ColumnarValue;
+
+impl From<&RecordBatch> for NullColumnarValue {
+ fn from(batch: &RecordBatch) -> Self {
+ let num_rows = batch.num_rows();
+ ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
+ }
+}
+
impl PhysicalExpr for ScalarFunctionExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -1386,12 +1413,18 @@ impl PhysicalExpr for ScalarFunctionExpr {
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
- // evaluate the arguments
- let inputs = self
- .args
- .iter()
- .map(|e| e.evaluate(batch))
- .collect::<Result<Vec<_>>>()?;
+ // evaluate the arguments, if there are no arguments we'll instead pass in a null array
+ // indicating the batch size (as a convention)
+ let inputs = match (self.args.len(), self.name.parse::<BuiltinScalarFunction>()) {
+ (0, Ok(scalar_fun)) if scalar_fun.supports_zero_argument() => {
+ vec![NullColumnarValue::from(batch)]
+ }
+ _ => self
+ .args
+ .iter()
+ .map(|e| e.evaluate(batch))
+ .collect::<Result<Vec<_>>>()?,
+ };
// evaluate the function
let fun = self.fun.as_ref();
@@ -1399,7 +1432,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
}
}
-/// decorates a function to handle [`ScalarValue`]s by coverting them to arrays before calling the function
+/// decorates a function to handle [`ScalarValue`]s by converting them to arrays before calling the function
/// and vice-versa after evaluation.
pub fn make_scalar_function<F>(inner: F) -> ScalarFunctionImplementation
where
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion/src/physical_plan/udf.rs
index 9189da4..a79c0a8 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion/src/physical_plan/udf.rs
@@ -43,6 +43,13 @@ pub struct ScalarUDF {
/// Return type
pub return_type: ReturnTypeFunction,
/// actual implementation
+ ///
+ /// The fn param is the wrapped function but be aware that the function will
+ /// be passed with the slice / vec of columnar values (either scalar or array)
+ /// with the exception of zero param function, where a singular element vec
+ /// will be passed. In that case the single element is a null array to indicate
+ /// the batch's row count (so that the generative zero-argument function can know
+ /// the result array size).
pub fun: ScalarFunctionImplementation,
}