You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/01/09 16:27:47 UTC
[arrow-datafusion] branch master updated: Add DataFrame::into_view instead of implementing TableProvider (#2659) (#4778)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c5e2594e9 Add DataFrame::into_view instead of implementing TableProvider (#2659) (#4778)
c5e2594e9 is described below
commit c5e2594e99b01c12d4f6903cb998a62a5479455c
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Jan 9 17:27:41 2023 +0100
Add DataFrame::into_view instead of implementing TableProvider (#2659) (#4778)
---
datafusion/core/src/dataframe.rs | 41 ++++++++++++++++-------------
datafusion/core/src/datasource/view.rs | 4 +--
datafusion/expr/src/logical_plan/builder.rs | 10 +++++++
3 files changed, 35 insertions(+), 20 deletions(-)
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index fe417593a..e9773dbdf 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -528,6 +528,15 @@ impl DataFrame {
self.session_state.optimize(&self.plan)
}
+ /// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered
+ /// as a table view using [`SessionContext::register_table`].
+ ///
+ /// Note: This discards the [`SessionState`] associated with this
+ /// [`DataFrame`] in favour of the one passed to [`TableProvider::scan`]
+ pub fn into_view(self) -> Arc<dyn TableProvider> {
+ Arc::new(DataFrameTableProvider { plan: self.plan })
+ }
+
/// Return the optimized logical plan represented by this DataFrame.
///
/// Note: This method should not be used outside testing, as it loses the snapshot
@@ -766,9 +775,12 @@ impl DataFrame {
}
}
-// TODO: This will introduce a ref cycle (#2659)
+struct DataFrameTableProvider {
+ plan: LogicalPlan,
+}
+
#[async_trait]
-impl TableProvider for DataFrame {
+impl TableProvider for DataFrameTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
@@ -796,20 +808,14 @@ impl TableProvider for DataFrame {
async fn scan(
&self,
- _state: &SessionState,
+ state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
- let mut expr = self.clone();
+ let mut expr = LogicalPlanBuilder::from(self.plan.clone());
if let Some(p) = projection {
- let schema = TableProvider::schema(&expr).project(p)?;
- let names = schema
- .fields()
- .iter()
- .map(|field| field.name().as_str())
- .collect::<Vec<_>>();
- expr = expr.select_columns(names.as_slice())?;
+ expr = expr.select(p.iter().copied())?
}
// Add filter when given
@@ -817,13 +823,12 @@ impl TableProvider for DataFrame {
if let Some(filter) = filter {
expr = expr.filter(filter)?
}
+ // add a limit if given
if let Some(l) = limit {
expr = expr.limit(0, Some(l))?
}
- // add a limit if given
- Self::new(self.session_state.clone(), expr.plan)
- .create_physical_plan()
- .await
+ let plan = expr.build()?;
+ state.create_physical_plan(&plan).await
}
}
@@ -1098,7 +1103,7 @@ mod tests {
let df_impl = DataFrame::new(ctx.state(), df.plan.clone());
// register a dataframe as a table
- ctx.register_table("test_table", Arc::new(df_impl.clone()))?;
+ ctx.register_table("test_table", df_impl.clone().into_view())?;
// pull the table out
let table = ctx.table("test_table").await?;
@@ -1297,7 +1302,7 @@ mod tests {
let df = test_table().await?.select_columns(&["c1", "c2", "c3"])?;
let ctx = SessionContext::new();
- let table = Arc::new(df);
+ let table = df.into_view();
ctx.register_table("t1", table.clone())?;
ctx.register_table("t2", table)?;
let df = ctx
@@ -1386,7 +1391,7 @@ mod tests {
)
.await?;
- ctx.register_table("t1", Arc::new(ctx.table("test").await?))?;
+ ctx.register_table("t1", ctx.table("test").await?.into_view())?;
let df = ctx
.table("t1")
diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index 2d2f33dc2..524ad9f5c 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -428,7 +428,7 @@ mod tests {
)
.await?;
- ctx.register_table("t1", Arc::new(ctx.table("test").await?))?;
+ ctx.register_table("t1", ctx.table("test").await?.into_view())?;
ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;
@@ -458,7 +458,7 @@ mod tests {
)
.await?;
- ctx.register_table("t1", Arc::new(ctx.table("test").await?))?;
+ ctx.register_table("t1", ctx.table("test").await?.into_view())?;
ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?;
diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs
index 63783a110..428a31f14 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -288,6 +288,16 @@ impl LogicalPlanBuilder {
Ok(Self::from(project(self.plan, expr)?))
}
+ /// Select the given column indices
+ pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
+ let fields = self.plan.schema().fields();
+ let exprs: Vec<_> = indices
+ .into_iter()
+ .map(|x| Expr::Column(fields[x].qualified_column()))
+ .collect();
+ self.project(exprs)
+ }
+
/// Apply a filter
pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
let expr = normalize_col(expr.into(), &self.plan)?;