You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/05/16 10:03:02 UTC

[arrow-datafusion] branch master updated: Use NullArray to Pass row count to ScalarFunctions that take 0 arguments (#328)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c50371  Use NullArray to Pass row count to ScalarFunctions that take 0 arguments  (#328)
1c50371 is described below

commit 1c5037138305f5b6204a1dbbfb17f9521f6995d3
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Sun May 16 18:02:53 2021 +0800

    Use NullArray to Pass row count to ScalarFunctions that take 0 arguments  (#328)
    
    * add docs and comments
    
    * use supports_zero_argument
---
 datafusion/src/physical_plan/functions.rs | 49 ++++++++++++++++++++++++++-----
 datafusion/src/physical_plan/udf.rs       |  7 +++++
 2 files changed, 48 insertions(+), 8 deletions(-)

diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 2e053a8..c0c915f 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -44,13 +44,14 @@ use crate::{
     scalar::ScalarValue,
 };
 use arrow::{
-    array::ArrayRef,
+    array::{ArrayRef, NullArray},
     compute::kernels::length::{bit_length, length},
     datatypes::TimeUnit,
     datatypes::{DataType, Field, Int32Type, Int64Type, Schema},
     record_batch::RecordBatch,
 };
 use fmt::{Debug, Formatter};
+use std::convert::From;
 use std::{any::Any, fmt, str::FromStr, sync::Arc};
 
 /// A function's signature, which defines the function's supported argument types.
@@ -76,6 +77,13 @@ pub enum Signature {
 }
 
 /// Scalar function
+///
+/// The Fn param is the wrapped function but be aware that the function will
+/// be passed with the slice / vec of columnar values (either scalar or array)
+/// with the exception of zero param function, where a singular element vec
+/// will be passed. In that case the single element is a null array to indicate
+/// the batch's row count (so that the generative zero-argument function can know
+/// the result array size).
 pub type ScalarFunctionImplementation =
     Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;
 
@@ -207,6 +215,14 @@ pub enum BuiltinScalarFunction {
     RegexpMatch,
 }
 
+impl BuiltinScalarFunction {
+    /// an allowlist of functions to take zero arguments, so that they will get special treatment
+    /// while executing.
+    fn supports_zero_argument(&self) -> bool {
+        matches!(self, BuiltinScalarFunction::Now)
+    }
+}
+
 impl fmt::Display for BuiltinScalarFunction {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         // lowercase of the debug.
@@ -1371,6 +1387,17 @@ impl fmt::Display for ScalarFunctionExpr {
     }
 }
 
+/// null columnar values are implemented as a null array in order to pass batch
+/// num_rows
+type NullColumnarValue = ColumnarValue;
+
+impl From<&RecordBatch> for NullColumnarValue {
+    fn from(batch: &RecordBatch) -> Self {
+        let num_rows = batch.num_rows();
+        ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
+    }
+}
+
 impl PhysicalExpr for ScalarFunctionExpr {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
@@ -1386,12 +1413,18 @@ impl PhysicalExpr for ScalarFunctionExpr {
     }
 
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
-        // evaluate the arguments
-        let inputs = self
-            .args
-            .iter()
-            .map(|e| e.evaluate(batch))
-            .collect::<Result<Vec<_>>>()?;
+        // evaluate the arguments, if there are no arguments we'll instead pass in a null array
+        // indicating the batch size (as a convention)
+        let inputs = match (self.args.len(), self.name.parse::<BuiltinScalarFunction>()) {
+            (0, Ok(scalar_fun)) if scalar_fun.supports_zero_argument() => {
+                vec![NullColumnarValue::from(batch)]
+            }
+            _ => self
+                .args
+                .iter()
+                .map(|e| e.evaluate(batch))
+                .collect::<Result<Vec<_>>>()?,
+        };
 
         // evaluate the function
         let fun = self.fun.as_ref();
@@ -1399,7 +1432,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
     }
 }
 
-/// decorates a function to handle [`ScalarValue`]s by coverting them to arrays before calling the function
+/// decorates a function to handle [`ScalarValue`]s by converting them to arrays before calling the function
 /// and vice-versa after evaluation.
 pub fn make_scalar_function<F>(inner: F) -> ScalarFunctionImplementation
 where
diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion/src/physical_plan/udf.rs
index 9189da4..a79c0a8 100644
--- a/datafusion/src/physical_plan/udf.rs
+++ b/datafusion/src/physical_plan/udf.rs
@@ -43,6 +43,13 @@ pub struct ScalarUDF {
     /// Return type
     pub return_type: ReturnTypeFunction,
     /// actual implementation
+    ///
+    /// The fn param is the wrapped function but be aware that the function will
+    /// be passed with the slice / vec of columnar values (either scalar or array)
+    /// with the exception of zero param function, where a singular element vec
+    /// will be passed. In that case the single element is a null array to indicate
+    /// the batch's row count (so that the generative zero-argument function can know
+    /// the result array size).
     pub fun: ScalarFunctionImplementation,
 }