You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2023/01/27 02:12:20 UTC
[arrow-datafusion-python] branch main updated: Add DataFrame methods for accessing plans (#153)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 0e74732 Add DataFrame methods for accessing plans (#153)
0e74732 is described below
commit 0e74732635f79e6c0ee86b3e29b64de2fa84a0c8
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Jan 26 19:12:16 2023 -0700
Add DataFrame methods for accessing plans (#153)
---
datafusion/tests/test_dataframe.py | 15 +++++++++++++++
src/dataframe.rs | 18 ++++++++++++++++++
src/lib.rs | 2 ++
src/logical.rs | 2 +-
src/{logical.rs => physical_plan.rs} | 32 ++++++++++++++------------------
5 files changed, 50 insertions(+), 19 deletions(-)
diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py
index aac5db2..3ecb836 100644
--- a/datafusion/tests/test_dataframe.py
+++ b/datafusion/tests/test_dataframe.py
@@ -258,6 +258,21 @@ def test_explain(df):
df.explain()
+def test_logical_plan(df):
+ plan = df.logical_plan()
+ assert plan is not None
+
+
+def test_optimized_logical_plan(df):
+ plan = df.optimized_logical_plan()
+ assert plan is not None
+
+
+def test_execution_plan(df):
+ plan = df.execution_plan()
+ assert plan is not None
+
+
def test_repartition(df):
df.repartition(2)
diff --git a/src/dataframe.rs b/src/dataframe.rs
index caa3958..749b7bd 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use crate::logical::PyLogicalPlan;
+use crate::physical_plan::PyExecutionPlan;
use crate::utils::wait_for_future;
use crate::{errors::DataFusionError, expression::PyExpr};
use datafusion::arrow::datatypes::Schema;
@@ -202,6 +204,22 @@ impl PyDataFrame {
pretty::print_batches(&batches).map_err(|err| PyArrowException::new_err(err.to_string()))
}
+ /// Get the logical plan for this `DataFrame`
+ fn logical_plan(&self) -> PyResult<PyLogicalPlan> {
+ Ok(self.df.as_ref().clone().into_optimized_plan()?.into())
+ }
+
+ /// Get the optimized logical plan for this `DataFrame`
+ fn optimized_logical_plan(&self) -> PyResult<PyLogicalPlan> {
+ Ok(self.df.as_ref().clone().into_optimized_plan()?.into())
+ }
+
+ /// Get the execution plan for this `DataFrame`
+ fn execution_plan(&self, py: Python) -> PyResult<PyExecutionPlan> {
+ let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())?;
+ Ok(plan.into())
+ }
+
/// Repartition a `DataFrame` based on a logical partitioning scheme.
fn repartition(&self, num: usize) -> PyResult<Self> {
let new_df = self
diff --git a/src/lib.rs b/src/lib.rs
index 21b47f4..eda74ff 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -35,6 +35,7 @@ mod expression;
#[allow(clippy::borrow_deref_ref)]
mod functions;
pub mod logical;
+pub mod physical_plan;
mod pyarrow_filter_expression;
pub mod store;
pub mod substrait;
@@ -65,6 +66,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<udaf::PyAggregateUDF>()?;
m.add_class::<config::PyConfig>()?;
m.add_class::<logical::PyLogicalPlan>()?;
+ m.add_class::<physical_plan::PyExecutionPlan>()?;
// Register the functions as a submodule
let funcs = PyModule::new(py, "functions")?;
diff --git a/src/logical.rs b/src/logical.rs
index 8c3acf7..304cdf9 100644
--- a/src/logical.rs
+++ b/src/logical.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
use datafusion_expr::LogicalPlan;
use pyo3::prelude::*;
-#[pyclass(name = "LogicalPlan", module = "substrait", subclass)]
+#[pyclass(name = "LogicalPlan", module = "datafusion", subclass)]
#[derive(Debug, Clone)]
pub struct PyLogicalPlan {
pub(crate) plan: Arc<LogicalPlan>,
diff --git a/src/logical.rs b/src/physical_plan.rs
similarity index 58%
copy from src/logical.rs
copy to src/physical_plan.rs
index 8c3acf7..b4c68b9 100644
--- a/src/logical.rs
+++ b/src/physical_plan.rs
@@ -15,36 +15,32 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;
-use datafusion_expr::LogicalPlan;
use pyo3::prelude::*;
-#[pyclass(name = "LogicalPlan", module = "substrait", subclass)]
+#[pyclass(name = "ExecutionPlan", module = "datafusion", subclass)]
#[derive(Debug, Clone)]
-pub struct PyLogicalPlan {
- pub(crate) plan: Arc<LogicalPlan>,
+pub struct PyExecutionPlan {
+ pub(crate) plan: Arc<dyn ExecutionPlan>,
}
-impl PyLogicalPlan {
- /// creates a new PyLogicalPlan
- pub fn new(plan: LogicalPlan) -> Self {
- Self {
- plan: Arc::new(plan),
- }
+impl PyExecutionPlan {
+ /// creates a new PyPhysicalPlan
+ pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+ Self { plan }
}
}
-impl From<PyLogicalPlan> for LogicalPlan {
- fn from(logical_plan: PyLogicalPlan) -> LogicalPlan {
- logical_plan.plan.as_ref().clone()
+impl From<PyExecutionPlan> for Arc<dyn ExecutionPlan> {
+ fn from(plan: PyExecutionPlan) -> Arc<dyn ExecutionPlan> {
+ plan.plan.clone()
}
}
-impl From<LogicalPlan> for PyLogicalPlan {
- fn from(logical_plan: LogicalPlan) -> PyLogicalPlan {
- PyLogicalPlan {
- plan: Arc::new(logical_plan),
- }
+impl From<Arc<dyn ExecutionPlan>> for PyExecutionPlan {
+ fn from(plan: Arc<dyn ExecutionPlan>) -> PyExecutionPlan {
+ PyExecutionPlan { plan: plan.clone() }
}
}