You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/19 19:39:01 UTC

[arrow-datafusion] branch master updated: update `python` crate to support latest pyo3 syntax and gil sematics (#741)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new c51e9ec  update `python` crate to support latest pyo3 syntax and gil sematics (#741)
c51e9ec is described below

commit c51e9ece1893c08cb0a605740356d356e9168052
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Tue Jul 20 03:37:43 2021 +0800

    update `python` crate to support latest pyo3 syntax and gil sematics (#741)
    
    * update dependencies
    
    * rename macros
    
    * update pyo3 deprecate
---
 python/src/functions.rs |   4 +-
 python/src/to_py.rs     |  27 +++++-----
 python/src/udaf.rs      | 136 +++++++++++++++++++++++-------------------------
 python/src/udf.rs       |  52 +++++++++---------
 4 files changed, 103 insertions(+), 116 deletions(-)

diff --git a/python/src/functions.rs b/python/src/functions.rs
index b03004f..9b60fdb 100644
--- a/python/src/functions.rs
+++ b/python/src/functions.rs
@@ -25,7 +25,7 @@ use std::sync::Arc;
 
 /// Expression representing a column on the existing plan.
 #[pyfunction]
-#[text_signature = "(name)"]
+#[pyo3(text_signature = "(name)")]
 fn col(name: &str) -> expression::Expression {
     expression::Expression {
         expr: logical_plan::col(name),
@@ -34,7 +34,7 @@ fn col(name: &str) -> expression::Expression {
 
 /// Expression representing a constant value
 #[pyfunction]
-#[text_signature = "(value)"]
+#[pyo3(text_signature = "(value)")]
 fn lit(value: i32) -> expression::Expression {
     expression::Expression {
         expr: logical_plan::lit(value),
diff --git a/python/src/to_py.rs b/python/src/to_py.rs
index ff03e03..6bc0581 100644
--- a/python/src/to_py.rs
+++ b/python/src/to_py.rs
@@ -15,15 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::arrow::array::ArrayRef;
+use datafusion::arrow::record_batch::RecordBatch;
 use libc::uintptr_t;
 use pyo3::prelude::*;
+use pyo3::types::PyList;
 use pyo3::PyErr;
-
 use std::convert::From;
 
-use datafusion::arrow::array::ArrayRef;
-use datafusion::arrow::record_batch::RecordBatch;
-
 use crate::errors;
 
 pub fn to_py_array(array: &ArrayRef, py: Python) -> PyResult<PyObject> {
@@ -64,15 +63,13 @@ fn to_py_batch<'a>(
 
 /// Converts a &[RecordBatch] into a Vec<RecordBatch> represented in PyArrow
 pub fn to_py(batches: &[RecordBatch]) -> PyResult<PyObject> {
-    let gil = pyo3::Python::acquire_gil();
-    let py = gil.python();
-    let pyarrow = PyModule::import(py, "pyarrow")?;
-    let builtins = PyModule::import(py, "builtins")?;
-
-    let mut py_batches = vec![];
-    for batch in batches {
-        py_batches.push(to_py_batch(batch, py, pyarrow)?);
-    }
-    let result = builtins.call1("list", (py_batches,))?;
-    Ok(PyObject::from(result))
+    Python::with_gil(|py| {
+        let pyarrow = PyModule::import(py, "pyarrow")?;
+        let mut py_batches = vec![];
+        for batch in batches {
+            py_batches.push(to_py_batch(batch, py, pyarrow)?);
+        }
+        let list = PyList::new(py, py_batches);
+        Ok(PyObject::from(list))
+    })
 }
diff --git a/python/src/udaf.rs b/python/src/udaf.rs
index 3ce223d..83e8be0 100644
--- a/python/src/udaf.rs
+++ b/python/src/udaf.rs
@@ -44,18 +44,17 @@ impl PyAccumulator {
 
 impl Accumulator for PyAccumulator {
     fn state(&self) -> Result<Vec<datafusion::scalar::ScalarValue>> {
-        let gil = pyo3::Python::acquire_gil();
-        let py = gil.python();
-
-        let state = self
-            .accum
-            .as_ref(py)
-            .call_method0("to_scalars")
-            .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?
-            .extract::<Vec<Scalar>>()
-            .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
-
-        Ok(state.into_iter().map(|v| v.scalar).collect::<Vec<_>>())
+        Python::with_gil(|py| {
+            let state = self
+                .accum
+                .as_ref(py)
+                .call_method0("to_scalars")
+                .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?
+                .extract::<Vec<Scalar>>()
+                .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
+
+            Ok(state.into_iter().map(|v| v.scalar).collect::<Vec<_>>())
+        })
     }
 
     fn update(&mut self, _values: &[ScalarValue]) -> Result<()> {
@@ -69,66 +68,60 @@ impl Accumulator for PyAccumulator {
     }
 
     fn evaluate(&self) -> Result<datafusion::scalar::ScalarValue> {
-        // get GIL
-        let gil = pyo3::Python::acquire_gil();
-        let py = gil.python();
-
-        let value = self
-            .accum
-            .as_ref(py)
-            .call_method0("evaluate")
-            .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
-
-        to_rust_scalar(value)
-            .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))
+        Python::with_gil(|py| {
+            let value = self
+                .accum
+                .as_ref(py)
+                .call_method0("evaluate")
+                .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
+
+            to_rust_scalar(value)
+                .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))
+        })
     }
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        // get GIL
-        let gil = pyo3::Python::acquire_gil();
-        let py = gil.python();
-
-        // 1. cast args to Pyarrow array
-        // 2. call function
-
-        // 1.
-        let py_args = values
-            .iter()
-            .map(|arg| {
-                // remove unwrap
-                to_py_array(arg, py).unwrap()
-            })
-            .collect::<Vec<_>>();
-        let py_args = PyTuple::new(py, py_args);
-
-        // update accumulator
-        self.accum
-            .as_ref(py)
-            .call_method1("update", py_args)
-            .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
-
-        Ok(())
+        Python::with_gil(|py| {
+            // 1. cast args to Pyarrow array
+            // 2. call function
+
+            // 1.
+            let py_args = values
+                .iter()
+                .map(|arg| {
+                    // remove unwrap
+                    to_py_array(arg, py).unwrap()
+                })
+                .collect::<Vec<_>>();
+            let py_args = PyTuple::new(py, py_args);
+
+            // update accumulator
+            self.accum
+                .as_ref(py)
+                .call_method1("update", py_args)
+                .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
+
+            Ok(())
+        })
     }
 
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        // get GIL
-        let gil = pyo3::Python::acquire_gil();
-        let py = gil.python();
-
-        // 1. cast states to Pyarrow array
-        // 2. merge
-        let state = &states[0];
-
-        let state = to_py_array(state, py)
-            .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
-
-        // 2.
-        self.accum
-            .as_ref(py)
-            .call_method1("merge", (state,))
-            .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
-
-        Ok(())
+        Python::with_gil(|py| {
+            // 1. cast states to Pyarrow array
+            // 2. merge
+            let state = &states[0];
+
+            let state = to_py_array(state, py)
+                .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
+
+            // 2.
+            self.accum
+                .as_ref(py)
+                .call_method1("merge", (state,))
+                .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
+
+            Ok(())
+        })
     }
 }
 
@@ -136,12 +129,11 @@ pub fn array_udaf(
     accumulator: PyObject,
 ) -> Arc<dyn Fn() -> Result<Box<dyn Accumulator>> + Send + Sync> {
     Arc::new(move || -> Result<Box<dyn Accumulator>> {
-        let gil = pyo3::Python::acquire_gil();
-        let py = gil.python();
-
-        let accumulator = accumulator
-            .call0(py)
-            .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?;
+        let accumulator = Python::with_gil(|py| {
+            accumulator
+                .call0(py)
+                .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))
+        })?;
         Ok(Box::new(PyAccumulator::new(accumulator)))
     })
 }
diff --git a/python/src/udf.rs b/python/src/udf.rs
index 7fee710..49a18d9 100644
--- a/python/src/udf.rs
+++ b/python/src/udf.rs
@@ -30,33 +30,31 @@ use crate::to_rust::to_rust;
 pub fn array_udf(func: PyObject) -> ScalarFunctionImplementation {
     make_scalar_function(
         move |args: &[array::ArrayRef]| -> Result<array::ArrayRef, DataFusionError> {
-            // get GIL
-            let gil = pyo3::Python::acquire_gil();
-            let py = gil.python();
-
-            // 1. cast args to Pyarrow arrays
-            // 2. call function
-            // 3. cast to arrow::array::Array
-
-            // 1.
-            let py_args = args
-                .iter()
-                .map(|arg| {
-                    // remove unwrap
-                    to_py_array(arg, py).unwrap()
-                })
-                .collect::<Vec<_>>();
-            let py_args = PyTuple::new(py, py_args);
-
-            // 2.
-            let value = func.as_ref(py).call(py_args, None);
-            let value = match value {
-                Ok(n) => Ok(n),
-                Err(error) => Err(DataFusionError::Execution(format!("{:?}", error))),
-            }?;
-
-            let array = to_rust(value).unwrap();
-            Ok(array)
+            Python::with_gil(|py| {
+                // 1. cast args to Pyarrow arrays
+                // 2. call function
+                // 3. cast to arrow::array::Array
+
+                // 1.
+                let py_args = args
+                    .iter()
+                    .map(|arg| {
+                        // remove unwrap
+                        to_py_array(arg, py).unwrap()
+                    })
+                    .collect::<Vec<_>>();
+                let py_args = PyTuple::new(py, py_args);
+
+                // 2.
+                let value = func.as_ref(py).call(py_args, None);
+                let value = match value {
+                    Ok(n) => Ok(n),
+                    Err(error) => Err(DataFusionError::Execution(format!("{:?}", error))),
+                }?;
+
+                let array = to_rust(value).unwrap();
+                Ok(array)
+            })
         },
     )
 }