You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/10/22 15:52:10 UTC
[arrow-datafusion] branch master updated: Support inlining view / dataframes logical plan (#3923)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 318b4ad56 Support inlining view / dataframes logical plan (#3923)
318b4ad56 is described below
commit 318b4ad56c826b317b0201f7031a47609ad215b5
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Sat Oct 22 17:52:04 2022 +0200
Support inlining view / dataframes logical plan (#3923)
* Inline TableScans for views and dataframes
* Inline TableScans for views and dataframes
* Inline TableScans for views and dataframes
* WIP
* WIP
* WIP
* WIP
* WIP
* WIP
* fmt
* doc
* Fix test
* Simplify
* Fix
* Rename test source
* Use plan instead of projected schema
* Docs
* Use SubqueryAlias
* Revert "Use SubqueryAlias"
This reverts commit 207c2a967296f1b124a82d85220543b574313b34.
* WIP
* Fix issue
* Clippy
* Format
---
benchmarks/expected-plans/q15.txt | 14 +-
datafusion/core/src/dataframe.rs | 12 +-
datafusion/core/src/datasource/datasource.rs | 6 +
.../core/src/datasource/default_table_source.rs | 4 +
datafusion/core/src/datasource/view.rs | 4 +
datafusion/expr/src/table_source.rs | 7 +-
datafusion/optimizer/src/inline_table_scan.rs | 180 +++++++++++++++++++++
datafusion/optimizer/src/lib.rs | 1 +
datafusion/optimizer/src/optimizer.rs | 2 +
datafusion/optimizer/src/projection_push_down.rs | 4 +-
10 files changed, 228 insertions(+), 6 deletions(-)
diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt
index e2f59dc5c..e78f8e0d9 100644
--- a/benchmarks/expected-plans/q15.txt
+++ b/benchmarks/expected-plans/q15.txt
@@ -4,8 +4,18 @@ Sort: supplier.s_suppkey ASC NULLS LAST
Inner Join: revenue0.total_revenue = __sq_1.__value
Inner Join: supplier.s_suppkey = revenue0.supplier_no
TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone]
- TableScan: revenue0 projection=[supplier_no, total_revenue]
+ Projection: supplier_no, total_revenue, alias=revenue0
+ Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
+ Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
+ Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
+ Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
+ TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1
Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]
- TableScan: revenue0 projection=[total_revenue]
+ Projection: total_revenue, alias=revenue0
+ Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
+ Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
+ Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
+ Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
+ TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
EmptyRelation
\ No newline at end of file
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 4e9798df9..a699a234c 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -773,6 +773,10 @@ impl TableProvider for DataFrame {
self
}
+ fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+ Some(&self.plan)
+ }
+
fn supports_filter_pushdown(
&self,
_filter: &Expr,
@@ -1337,8 +1341,12 @@ mod tests {
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
- \n TableScan: t1 projection=[c1, c2, c3]\
- \n TableScan: t2 projection=[c1, c2, c3]",
+ \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\
+ \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
+ \n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
+ \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\
+ \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
+ \n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
format!("{:?}", df_renamed.to_logical_plan()?)
);
diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs
index 270747347..84111fed0 100644
--- a/datafusion/core/src/datasource/datasource.rs
+++ b/datafusion/core/src/datasource/datasource.rs
@@ -21,6 +21,7 @@ use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
+use datafusion_expr::LogicalPlan;
pub use datafusion_expr::{TableProviderFilterPushDown, TableType};
use crate::arrow::datatypes::SchemaRef;
@@ -47,6 +48,11 @@ pub trait TableProvider: Sync + Send {
None
}
+ /// Get the Logical Plan of this table, if available.
+ fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+ None
+ }
+
/// Create an ExecutionPlan that will scan the table.
/// The table provider will be usually responsible of grouping
/// the source data into partitions that can be efficiently
diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/core/src/datasource/default_table_source.rs
index 2e65be0bc..bbb9fbdd6 100644
--- a/datafusion/core/src/datasource/default_table_source.rs
+++ b/datafusion/core/src/datasource/default_table_source.rs
@@ -60,6 +60,10 @@ impl TableSource for DefaultTableSource {
) -> datafusion_common::Result<TableProviderFilterPushDown> {
self.table_provider.supports_filter_pushdown(filter)
}
+
+ fn get_logical_plan(&self) -> Option<&datafusion_expr::LogicalPlan> {
+ self.table_provider.get_logical_plan()
+ }
}
/// Wrap TableProvider in TableSource
diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index dfe6d42f8..043ce256b 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -77,6 +77,10 @@ impl TableProvider for ViewTable {
self
}
+ fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+ Some(&self.logical_plan)
+ }
+
fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema)
}
diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs
index 990022bbf..10984f779 100644
--- a/datafusion/expr/src/table_source.rs
+++ b/datafusion/expr/src/table_source.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::Expr;
+use crate::{Expr, LogicalPlan};
use arrow::datatypes::SchemaRef;
use std::any::Any;
@@ -76,4 +76,9 @@ pub trait TableSource: Sync + Send {
) -> datafusion_common::Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Unsupported)
}
+
+ /// Get the Logical plan of this table provider, if available.
+ fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+ None
+ }
}
diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs
new file mode 100644
index 000000000..89c78405a
--- /dev/null
+++ b/datafusion/optimizer/src/inline_table_scan.rs
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to replace TableScan references
+//! such as DataFrames and Views and inlines the LogicalPlan
+//! to support further optimization
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::{
+ logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan,
+};
+
+/// Optimization rule that inlines TableScan that provide a [LogicalPlan]
+/// ([DataFrame] / [ViewTable])
+#[derive(Default)]
+pub struct InlineTableScan;
+
+impl InlineTableScan {
+ #[allow(missing_docs)]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+/// Inline
+fn inline_table_scan(plan: &LogicalPlan) -> Result<LogicalPlan> {
+ match plan {
+ // Match only on scans without filter / projection / fetch
+ // Views and DataFrames won't have those added
+ // during the early stage of planning
+ LogicalPlan::TableScan(TableScan {
+ source,
+ table_name,
+ filters,
+ fetch: None,
+ ..
+ }) if filters.is_empty() => {
+ if let Some(sub_plan) = source.get_logical_plan() {
+ // Recursively apply optimization
+ let plan = inline_table_scan(sub_plan)?;
+ let plan = LogicalPlanBuilder::from(plan).project_with_alias(
+ vec![Expr::Wildcard],
+ Some(table_name.to_string()),
+ )?;
+ plan.build()
+ } else {
+ // No plan available, return with table scan as is
+ Ok(plan.clone())
+ }
+ }
+
+ // Rest: Recurse
+ _ => {
+ // apply the optimization to all inputs of the plan
+ let inputs = plan.inputs();
+ let new_inputs = inputs
+ .iter()
+ .map(|plan| inline_table_scan(plan))
+ .collect::<Result<Vec<_>>>()?;
+
+ from_plan(plan, &plan.expressions(), &new_inputs)
+ }
+ }
+}
+
+impl OptimizerRule for InlineTableScan {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ _optimizer_config: &mut OptimizerConfig,
+ ) -> Result<LogicalPlan> {
+ inline_table_scan(plan)
+ }
+
+ fn name(&self) -> &str {
+ "inline_table_scan"
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::{sync::Arc, vec};
+
+ use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource};
+
+ use crate::{inline_table_scan::InlineTableScan, OptimizerConfig, OptimizerRule};
+
+ pub struct RawTableSource {}
+
+ impl TableSource for RawTableSource {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn schema(&self) -> arrow::datatypes::SchemaRef {
+ Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
+ }
+
+ fn supports_filter_pushdown(
+ &self,
+ _filter: &datafusion_expr::Expr,
+ ) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
+ {
+ Ok(datafusion_expr::TableProviderFilterPushDown::Inexact)
+ }
+ }
+
+ pub struct CustomSource {
+ plan: LogicalPlan,
+ }
+ impl CustomSource {
+ fn new() -> Self {
+ Self {
+ plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None)
+ .unwrap()
+ .build()
+ .unwrap(),
+ }
+ }
+ }
+ impl TableSource for CustomSource {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn supports_filter_pushdown(
+ &self,
+ _filter: &datafusion_expr::Expr,
+ ) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
+ {
+ Ok(datafusion_expr::TableProviderFilterPushDown::Exact)
+ }
+
+ fn schema(&self) -> arrow::datatypes::SchemaRef {
+ Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
+ }
+
+ fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+ Some(&self.plan)
+ }
+ }
+
+ #[test]
+ fn inline_table_scan() {
+ let rule = InlineTableScan::new();
+
+ let source = Arc::new(CustomSource::new());
+
+ let scan = LogicalPlanBuilder::scan("x".to_string(), source, None).unwrap();
+
+ let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap();
+
+ let optimized_plan = rule
+ .optimize(&plan, &mut OptimizerConfig::new())
+ .expect("failed to optimize plan");
+ let formatted_plan = format!("{:?}", optimized_plan);
+ let expected = "\
+ Filter: x.a = Int32(1)\
+ \n Projection: y.a, alias=x\
+ \n TableScan: y";
+
+ assert_eq!(formatted_plan, expected);
+ assert_eq!(plan.schema(), optimized_plan.schema());
+ }
+}
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index 23814dcf9..5e8108d67 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -23,6 +23,7 @@ pub mod eliminate_limit;
pub mod expr_simplifier;
pub mod filter_null_join_keys;
pub mod filter_push_down;
+pub mod inline_table_scan;
pub mod limit_push_down;
pub mod optimizer;
pub mod projection_push_down;
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 976131e04..7c37284e6 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -24,6 +24,7 @@ use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_limit::EliminateLimit;
use crate::filter_null_join_keys::FilterNullJoinKeys;
use crate::filter_push_down::FilterPushDown;
+use crate::inline_table_scan::InlineTableScan;
use crate::limit_push_down::LimitPushDown;
use crate::projection_push_down::ProjectionPushDown;
use crate::reduce_cross_join::ReduceCrossJoin;
@@ -148,6 +149,7 @@ impl Optimizer {
/// Create a new optimizer using the recommended list of rules
pub fn new(config: &OptimizerConfig) -> Self {
let mut rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
+ Arc::new(InlineTableScan::new()),
Arc::new(TypeCoercion::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs
index f6430b87a..d6ed6e488 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -527,7 +527,9 @@ fn optimize_plan(
}
fn projection_equal(p: &Projection, p2: &Projection) -> bool {
- p.expr.len() == p2.expr.len() && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
+ p.expr.len() == p2.expr.len()
+ && p.alias == p2.alias
+ && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
}
#[cfg(test)]