You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/08 14:01:33 UTC

[arrow-datafusion] 01/05: pyarrow

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-udf-udaf-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 6c312cc6d17fb0663f850b8ced2b50b6b49a1875
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:29:20 2022 +0800

    pyarrow
---
 datafusion-common/src/pyarrow.rs          |  6 ----
 datafusion-expr/src/accumulator.rs        | 44 +++++++++++++++++++++++
 datafusion-expr/src/columnar_value.rs     | 60 +++++++++++++++++++++++++++++++
 datafusion-expr/src/lib.rs                |  2 ++
 datafusion/src/physical_plan/functions.rs | 14 ++------
 datafusion/src/physical_plan/mod.rs       | 52 ++-------------------------
 6 files changed, 110 insertions(+), 68 deletions(-)

diff --git a/datafusion-common/src/pyarrow.rs b/datafusion-common/src/pyarrow.rs
index bf10b45..1ee88d7 100644
--- a/datafusion-common/src/pyarrow.rs
+++ b/datafusion-common/src/pyarrow.rs
@@ -25,12 +25,6 @@ use pyo3::prelude::PyErr;
 use pyo3::types::PyList;
 use pyo3::{FromPyObject, IntoPy, PyAny, PyObject, PyResult, Python};
 
-impl From<DataFusionError> for PyErr {
-    fn from(err: DataFusionError) -> PyErr {
-        PyException::new_err(err.to_string())
-    }
-}
-
 impl PyArrowConvert for ScalarValue {
     fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
         let py = value.py();
diff --git a/datafusion-expr/src/accumulator.rs b/datafusion-expr/src/accumulator.rs
new file mode 100644
index 0000000..599bd36
--- /dev/null
+++ b/datafusion-expr/src/accumulator.rs
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::ArrayRef;
+use datafusion_common::{Result, ScalarValue};
+use std::fmt::Debug;
+
+/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
+/// generically accumulates values.
+///
+/// An accumulator knows how to:
+/// * update its state from inputs via `update_batch`
+/// * convert its internal state to a vector of scalar values
+/// * update its state from multiple accumulators' states via `merge_batch`
+/// * compute the final value from its internal state via `evaluate`
+pub trait Accumulator: Send + Sync + Debug {
+    /// Returns the state of the accumulator at the end of the accumulation.
+    // in the case of an average on which we track `sum` and `n`, this function should return a vector
+    // of two values, sum and n.
+    fn state(&self) -> Result<Vec<ScalarValue>>;
+
+    /// updates the accumulator's state from a vector of arrays.
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
+
+    /// updates the accumulator's state from a vector of states.
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
+
+    /// returns its value based on its current state.
+    fn evaluate(&self) -> Result<ScalarValue>;
+}
diff --git a/datafusion-expr/src/columnar_value.rs b/datafusion-expr/src/columnar_value.rs
new file mode 100644
index 0000000..5e6959d
--- /dev/null
+++ b/datafusion-expr/src/columnar_value.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::ArrayRef;
+use arrow::array::NullArray;
+use arrow::datatypes::DataType;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::ScalarValue;
+use std::sync::Arc;
+
+/// Represents the result from an expression
+#[derive(Clone)]
+pub enum ColumnarValue {
+    /// Array of values
+    Array(ArrayRef),
+    /// A single value
+    Scalar(ScalarValue),
+}
+
+impl ColumnarValue {
+    pub fn data_type(&self) -> DataType {
+        match self {
+            ColumnarValue::Array(array_value) => array_value.data_type().clone(),
+            ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
+        }
+    }
+
+    /// Convert a columnar value into an ArrayRef
+    pub fn into_array(self, num_rows: usize) -> ArrayRef {
+        match self {
+            ColumnarValue::Array(array) => array,
+            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
+        }
+    }
+}
+
+/// null columnar values are implemented as a null array in order to pass batch
+/// num_rows
+pub type NullColumnarValue = ColumnarValue;
+
+impl From<&RecordBatch> for NullColumnarValue {
+    fn from(batch: &RecordBatch) -> Self {
+        let num_rows = batch.num_rows();
+        ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
+    }
+}
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
index 7dcddc3..eb86b0b 100644
--- a/datafusion-expr/src/lib.rs
+++ b/datafusion-expr/src/lib.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod accumulator;
 mod aggregate_function;
 mod built_in_function;
 mod operator;
@@ -22,6 +23,7 @@ mod signature;
 mod window_frame;
 mod window_function;
 
+pub use accumulator::Accumulator;
 pub use aggregate_function::AggregateFunction;
 pub use built_in_function::BuiltinScalarFunction;
 pub use operator::Operator;
diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs
index 9582eec..56e8f17 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -46,12 +46,13 @@ use crate::{
     scalar::ScalarValue,
 };
 use arrow::{
-    array::{ArrayRef, NullArray},
+    array::ArrayRef,
     compute::kernels::length::{bit_length, length},
     datatypes::TimeUnit,
     datatypes::{DataType, Field, Int32Type, Int64Type, Schema},
     record_batch::RecordBatch,
 };
+pub use datafusion_expr::NullColumnarValue;
 use fmt::{Debug, Formatter};
 use std::convert::From;
 use std::{any::Any, fmt, sync::Arc};
@@ -1206,17 +1207,6 @@ impl fmt::Display for ScalarFunctionExpr {
     }
 }
 
-/// null columnar values are implemented as a null array in order to pass batch
-/// num_rows
-type NullColumnarValue = ColumnarValue;
-
-impl From<&RecordBatch> for NullColumnarValue {
-    fn from(batch: &RecordBatch) -> Self {
-        let num_rows = batch.num_rows();
-        ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
-    }
-}
-
 impl PhysicalExpr for ScalarFunctionExpr {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index ac70f2f..38a19db 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -35,6 +35,8 @@ use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 use arrow::{array::ArrayRef, datatypes::Field};
 use async_trait::async_trait;
+pub use datafusion_expr::Accumulator;
+pub use datafusion_expr::ColumnarValue;
 pub use display::DisplayFormatType;
 use futures::stream::Stream;
 use std::fmt;
@@ -419,32 +421,6 @@ pub enum Distribution {
     HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
 }
 
-/// Represents the result from an expression
-#[derive(Clone)]
-pub enum ColumnarValue {
-    /// Array of values
-    Array(ArrayRef),
-    /// A single value
-    Scalar(ScalarValue),
-}
-
-impl ColumnarValue {
-    fn data_type(&self) -> DataType {
-        match self {
-            ColumnarValue::Array(array_value) => array_value.data_type().clone(),
-            ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
-        }
-    }
-
-    /// Convert a columnar value into an ArrayRef
-    pub fn into_array(self, num_rows: usize) -> ArrayRef {
-        match self {
-            ColumnarValue::Array(array) => array,
-            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
-        }
-    }
-}
-
 /// Expression that can be evaluated against a RecordBatch
 /// A Physical expression knows its type, nullability and how to evaluate itself.
 pub trait PhysicalExpr: Send + Sync + Display + Debug {
@@ -578,30 +554,6 @@ pub trait WindowExpr: Send + Sync + Debug {
     }
 }
 
-/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
-/// generically accumulates values.
-///
-/// An accumulator knows how to:
-/// * update its state from inputs via `update_batch`
-/// * convert its internal state to a vector of scalar values
-/// * update its state from multiple accumulators' states via `merge_batch`
-/// * compute the final value from its internal state via `evaluate`
-pub trait Accumulator: Send + Sync + Debug {
-    /// Returns the state of the accumulator at the end of the accumulation.
-    // in the case of an average on which we track `sum` and `n`, this function should return a vector
-    // of two values, sum and n.
-    fn state(&self) -> Result<Vec<ScalarValue>>;
-
-    /// updates the accumulator's state from a vector of arrays.
-    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
-
-    /// updates the accumulator's state from a vector of states.
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
-
-    /// returns its value based on its current state.
-    fn evaluate(&self) -> Result<ScalarValue>;
-}
-
 /// Applies an optional projection to a [`SchemaRef`], returning the
 /// projected schema
 ///