You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/19 19:39:01 UTC
[arrow-datafusion] branch master updated: update `python` crate to
support latest pyo3 syntax and gil sematics (#741)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c51e9ec update `python` crate to support latest pyo3 syntax and gil sematics (#741)
c51e9ec is described below
commit c51e9ece1893c08cb0a605740356d356e9168052
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Tue Jul 20 03:37:43 2021 +0800
update `python` crate to support latest pyo3 syntax and gil sematics (#741)
* update dependencies
* rename macros
* update pyo3 deprecate
---
python/src/functions.rs | 4 +-
python/src/to_py.rs | 27 +++++-----
python/src/udaf.rs | 136 +++++++++++++++++++++++-------------------------
python/src/udf.rs | 52 +++++++++---------
4 files changed, 103 insertions(+), 116 deletions(-)
diff --git a/python/src/functions.rs b/python/src/functions.rs
index b03004f..9b60fdb 100644
--- a/python/src/functions.rs
+++ b/python/src/functions.rs
@@ -25,7 +25,7 @@ use std::sync::Arc;
/// Expression representing a column on the existing plan.
#[pyfunction]
-#[text_signature = "(name)"]
+#[pyo3(text_signature = "(name)")]
fn col(name: &str) -> expression::Expression {
expression::Expression {
expr: logical_plan::col(name),
@@ -34,7 +34,7 @@ fn col(name: &str) -> expression::Expression {
/// Expression representing a constant value
#[pyfunction]
-#[text_signature = "(value)"]
+#[pyo3(text_signature = "(value)")]
fn lit(value: i32) -> expression::Expression {
expression::Expression {
expr: logical_plan::lit(value),
diff --git a/python/src/to_py.rs b/python/src/to_py.rs
index ff03e03..6bc0581 100644
--- a/python/src/to_py.rs
+++ b/python/src/to_py.rs
@@ -15,15 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::arrow::array::ArrayRef;
+use datafusion::arrow::record_batch::RecordBatch;
use libc::uintptr_t;
use pyo3::prelude::*;
+use pyo3::types::PyList;
use pyo3::PyErr;
-
use std::convert::From;
-use datafusion::arrow::array::ArrayRef;
-use datafusion::arrow::record_batch::RecordBatch;
-
use crate::errors;
pub fn to_py_array(array: &ArrayRef, py: Python) -> PyResult<PyObject> {
@@ -64,15 +63,13 @@ fn to_py_batch<'a>(
/// Converts a &[RecordBatch] into a Vec<RecordBatch> represented in PyArrow
pub fn to_py(batches: &[RecordBatch]) -> PyResult<PyObject> {
- let gil = pyo3::Python::acquire_gil();
- let py = gil.python();
- let pyarrow = PyModule::import(py, "pyarrow")?;
- let builtins = PyModule::import(py, "builtins")?;
-
- let mut py_batches = vec![];
- for batch in batches {
- py_batches.push(to_py_batch(batch, py, pyarrow)?);
- }
- let result = builtins.call1("list", (py_batches,))?;
- Ok(PyObject::from(result))
+ Python::with_gil(|py| {
+ let pyarrow = PyModule::import(py, "pyarrow")?;
+ let mut py_batches = vec![];
+ for batch in batches {
+ py_batches.push(to_py_batch(batch, py, pyarrow)?);
+ }
+ let list = PyList::new(py, py_batches);
+ Ok(PyObject::from(list))
+ })
}
diff --git a/python/src/udaf.rs b/python/src/udaf.rs
index 3ce223d..83e8be0 100644
--- a/python/src/udaf.rs
+++ b/python/src/udaf.rs
@@ -44,18 +44,17 @@ impl PyAccumulator {
impl Accumulator for PyAccumulator {
fn state(&self) -> Result<Vec<datafusion::scalar::ScalarValue>> {
- let gil = pyo3::Python::acquire_gil();
- let py = gil.python();
-
- let state = self
- .accum
- .as_ref(py)
- .call_method0("to_scalars")
- .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?
- .extract::<Vec<Scalar>>()
- .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
-
- Ok(state.into_iter().map(|v| v.scalar).collect::<Vec<_>>())
+ Python::with_gil(|py| {
+ let state = self
+ .accum
+ .as_ref(py)
+ .call_method0("to_scalars")
+ .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?
+ .extract::<Vec<Scalar>>()
+ .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
+
+ Ok(state.into_iter().map(|v| v.scalar).collect::<Vec<_>>())
+ })
}
fn update(&mut self, _values: &[ScalarValue]) -> Result<()> {
@@ -69,66 +68,60 @@ impl Accumulator for PyAccumulator {
}
fn evaluate(&self) -> Result<datafusion::scalar::ScalarValue> {
- // get GIL
- let gil = pyo3::Python::acquire_gil();
- let py = gil.python();
-
- let value = self
- .accum
- .as_ref(py)
- .call_method0("evaluate")
- .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
-
- to_rust_scalar(value)
- .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))
+ Python::with_gil(|py| {
+ let value = self
+ .accum
+ .as_ref(py)
+ .call_method0("evaluate")
+ .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
+
+ to_rust_scalar(value)
+ .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))
+ })
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- // get GIL
- let gil = pyo3::Python::acquire_gil();
- let py = gil.python();
-
- // 1. cast args to Pyarrow array
- // 2. call function
-
- // 1.
- let py_args = values
- .iter()
- .map(|arg| {
- // remove unwrap
- to_py_array(arg, py).unwrap()
- })
- .collect::<Vec<_>>();
- let py_args = PyTuple::new(py, py_args);
-
- // update accumulator
- self.accum
- .as_ref(py)
- .call_method1("update", py_args)
- .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
-
- Ok(())
+ Python::with_gil(|py| {
+ // 1. cast args to Pyarrow array
+ // 2. call function
+
+ // 1.
+ let py_args = values
+ .iter()
+ .map(|arg| {
+ // remove unwrap
+ to_py_array(arg, py).unwrap()
+ })
+ .collect::<Vec<_>>();
+ let py_args = PyTuple::new(py, py_args);
+
+ // update accumulator
+ self.accum
+ .as_ref(py)
+ .call_method1("update", py_args)
+ .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
+
+ Ok(())
+ })
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
- // get GIL
- let gil = pyo3::Python::acquire_gil();
- let py = gil.python();
-
- // 1. cast states to Pyarrow array
- // 2. merge
- let state = &states[0];
-
- let state = to_py_array(state, py)
- .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
-
- // 2.
- self.accum
- .as_ref(py)
- .call_method1("merge", (state,))
- .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
-
- Ok(())
+ Python::with_gil(|py| {
+ // 1. cast states to Pyarrow array
+ // 2. merge
+ let state = &states[0];
+
+ let state = to_py_array(state, py)
+ .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
+
+ // 2.
+ self.accum
+ .as_ref(py)
+ .call_method1("merge", (state,))
+ .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
+
+ Ok(())
+ })
}
}
@@ -136,12 +129,11 @@ pub fn array_udaf(
accumulator: PyObject,
) -> Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync> {
Arc::new(move || -> Result<Box<dyn Accumulator>> {
- let gil = pyo3::Python::acquire_gil();
- let py = gil.python();
-
- let accumulator = accumulator
- .call0(py)
- .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
+ let accumulator = Python::with_gil(|py| {
+ accumulator
+ .call0(py)
+ .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))
+ })?;
Ok(Box::new(PyAccumulator::new(accumulator)))
})
}
diff --git a/python/src/udf.rs b/python/src/udf.rs
index 7fee710..49a18d9 100644
--- a/python/src/udf.rs
+++ b/python/src/udf.rs
@@ -30,33 +30,31 @@ use crate::to_rust::to_rust;
pub fn array_udf(func: PyObject) -> ScalarFunctionImplementation {
make_scalar_function(
move |args: &[array::ArrayRef]| -> Result<array::ArrayRef, DataFusionError> {
- // get GIL
- let gil = pyo3::Python::acquire_gil();
- let py = gil.python();
-
- // 1. cast args to Pyarrow arrays
- // 2. call function
- // 3. cast to arrow::array::Array
-
- // 1.
- let py_args = args
- .iter()
- .map(|arg| {
- // remove unwrap
- to_py_array(arg, py).unwrap()
- })
- .collect::<Vec<_>>();
- let py_args = PyTuple::new(py, py_args);
-
- // 2.
- let value = func.as_ref(py).call(py_args, None);
- let value = match value {
- Ok(n) => Ok(n),
- Err(error) => Err(DataFusionError::Execution(format!("{:?}", error))),
- }?;
-
- let array = to_rust(value).unwrap();
- Ok(array)
+ Python::with_gil(|py| {
+ // 1. cast args to Pyarrow arrays
+ // 2. call function
+ // 3. cast to arrow::array::Array
+
+ // 1.
+ let py_args = args
+ .iter()
+ .map(|arg| {
+ // remove unwrap
+ to_py_array(arg, py).unwrap()
+ })
+ .collect::<Vec<_>>();
+ let py_args = PyTuple::new(py, py_args);
+
+ // 2.
+ let value = func.as_ref(py).call(py_args, None);
+ let value = match value {
+ Ok(n) => Ok(n),
+ Err(error) => Err(DataFusionError::Execution(format!("{:?}", error))),
+ }?;
+
+ let array = to_rust(value).unwrap();
+ Ok(array)
+ })
},
)
}