You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2023/01/27 02:12:20 UTC

[arrow-datafusion-python] branch main updated: Add DataFrame methods for accessing plans (#153)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e74732  Add DataFrame methods for accessing plans (#153)
0e74732 is described below

commit 0e74732635f79e6c0ee86b3e29b64de2fa84a0c8
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Jan 26 19:12:16 2023 -0700

    Add DataFrame methods for accessing plans (#153)
---
 datafusion/tests/test_dataframe.py   | 15 +++++++++++++++
 src/dataframe.rs                     | 18 ++++++++++++++++++
 src/lib.rs                           |  2 ++
 src/logical.rs                       |  2 +-
 src/{logical.rs => physical_plan.rs} | 32 ++++++++++++++------------------
 5 files changed, 50 insertions(+), 19 deletions(-)

diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py
index aac5db2..3ecb836 100644
--- a/datafusion/tests/test_dataframe.py
+++ b/datafusion/tests/test_dataframe.py
@@ -258,6 +258,21 @@ def test_explain(df):
     df.explain()
 
 
+def test_logical_plan(df):
+    plan = df.logical_plan()
+    assert plan is not None
+
+
+def test_optimized_logical_plan(df):
+    plan = df.optimized_logical_plan()
+    assert plan is not None
+
+
+def test_execution_plan(df):
+    plan = df.execution_plan()
+    assert plan is not None
+
+
 def test_repartition(df):
     df.repartition(2)
 
diff --git a/src/dataframe.rs b/src/dataframe.rs
index caa3958..749b7bd 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::logical::PyLogicalPlan;
+use crate::physical_plan::PyExecutionPlan;
 use crate::utils::wait_for_future;
 use crate::{errors::DataFusionError, expression::PyExpr};
 use datafusion::arrow::datatypes::Schema;
@@ -202,6 +204,22 @@ impl PyDataFrame {
         pretty::print_batches(&batches).map_err(|err| PyArrowException::new_err(err.to_string()))
     }
 
+    /// Get the logical plan for this `DataFrame`
+    fn logical_plan(&self) -> PyResult<PyLogicalPlan> {
+        Ok(self.df.as_ref().clone().into_optimized_plan()?.into())
+    }
+
+    /// Get the optimized logical plan for this `DataFrame`
+    fn optimized_logical_plan(&self) -> PyResult<PyLogicalPlan> {
+        Ok(self.df.as_ref().clone().into_optimized_plan()?.into())
+    }
+
+    /// Get the execution plan for this `DataFrame`
+    fn execution_plan(&self, py: Python) -> PyResult<PyExecutionPlan> {
+        let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())?;
+        Ok(plan.into())
+    }
+
     /// Repartition a `DataFrame` based on a logical partitioning scheme.
     fn repartition(&self, num: usize) -> PyResult<Self> {
         let new_df = self
diff --git a/src/lib.rs b/src/lib.rs
index 21b47f4..eda74ff 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -35,6 +35,7 @@ mod expression;
 #[allow(clippy::borrow_deref_ref)]
 mod functions;
 pub mod logical;
+pub mod physical_plan;
 mod pyarrow_filter_expression;
 pub mod store;
 pub mod substrait;
@@ -65,6 +66,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> {
     m.add_class::<udaf::PyAggregateUDF>()?;
     m.add_class::<config::PyConfig>()?;
     m.add_class::<logical::PyLogicalPlan>()?;
+    m.add_class::<physical_plan::PyExecutionPlan>()?;
 
     // Register the functions as a submodule
     let funcs = PyModule::new(py, "functions")?;
diff --git a/src/logical.rs b/src/logical.rs
index 8c3acf7..304cdf9 100644
--- a/src/logical.rs
+++ b/src/logical.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
 use datafusion_expr::LogicalPlan;
 use pyo3::prelude::*;
 
-#[pyclass(name = "LogicalPlan", module = "substrait", subclass)]
+#[pyclass(name = "LogicalPlan", module = "datafusion", subclass)]
 #[derive(Debug, Clone)]
 pub struct PyLogicalPlan {
     pub(crate) plan: Arc<LogicalPlan>,
diff --git a/src/logical.rs b/src/physical_plan.rs
similarity index 58%
copy from src/logical.rs
copy to src/physical_plan.rs
index 8c3acf7..b4c68b9 100644
--- a/src/logical.rs
+++ b/src/physical_plan.rs
@@ -15,36 +15,32 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::physical_plan::ExecutionPlan;
 use std::sync::Arc;
 
-use datafusion_expr::LogicalPlan;
 use pyo3::prelude::*;
 
-#[pyclass(name = "LogicalPlan", module = "substrait", subclass)]
+#[pyclass(name = "ExecutionPlan", module = "datafusion", subclass)]
 #[derive(Debug, Clone)]
-pub struct PyLogicalPlan {
-    pub(crate) plan: Arc<LogicalPlan>,
+pub struct PyExecutionPlan {
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
 }
 
-impl PyLogicalPlan {
-    /// creates a new PyLogicalPlan
-    pub fn new(plan: LogicalPlan) -> Self {
-        Self {
-            plan: Arc::new(plan),
-        }
+impl PyExecutionPlan {
+    /// creates a new PyPhysicalPlan
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        Self { plan }
     }
 }
 
-impl From<PyLogicalPlan> for LogicalPlan {
-    fn from(logical_plan: PyLogicalPlan) -> LogicalPlan {
-        logical_plan.plan.as_ref().clone()
+impl From<PyExecutionPlan> for Arc<dyn ExecutionPlan> {
+    fn from(plan: PyExecutionPlan) -> Arc<dyn ExecutionPlan> {
+        plan.plan.clone()
     }
 }
 
-impl From<LogicalPlan> for PyLogicalPlan {
-    fn from(logical_plan: LogicalPlan) -> PyLogicalPlan {
-        PyLogicalPlan {
-            plan: Arc::new(logical_plan),
-        }
+impl From<Arc<dyn ExecutionPlan>> for PyExecutionPlan {
+    fn from(plan: Arc<dyn ExecutionPlan>) -> PyExecutionPlan {
+        PyExecutionPlan { plan: plan.clone() }
     }
 }