You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/10/22 15:52:10 UTC

[arrow-datafusion] branch master updated: Support inlining view / dataframes logical plan (#3923)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 318b4ad56 Support inlining view / dataframes logical plan (#3923)
318b4ad56 is described below

commit 318b4ad56c826b317b0201f7031a47609ad215b5
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Sat Oct 22 17:52:04 2022 +0200

    Support inlining view / dataframes logical plan (#3923)
    
    * Inline TableScans for views and dataframes
    
    * Inline TableScans for views and dataframes
    
    * Inline TableScans for views and dataframes
    
    * WIP
    
    * WIP
    
    * WIP
    
    * WIP
    
    * WIP
    
    * WIP
    
    * fmt
    
    * doc
    
    * Fix test
    
    * Simplify
    
    * Fix
    
    * Rename test source
    
    * Use plan instead of projected schema
    
    * Docs
    
    * Use SubqueryAlias
    
    * Revert "Use SubqueryAlias"
    
    This reverts commit 207c2a967296f1b124a82d85220543b574313b34.
    
    * WIP
    
    * Fix issue
    
    * Clippy
    
    * Format
---
 benchmarks/expected-plans/q15.txt                  |  14 +-
 datafusion/core/src/dataframe.rs                   |  12 +-
 datafusion/core/src/datasource/datasource.rs       |   6 +
 .../core/src/datasource/default_table_source.rs    |   4 +
 datafusion/core/src/datasource/view.rs             |   4 +
 datafusion/expr/src/table_source.rs                |   7 +-
 datafusion/optimizer/src/inline_table_scan.rs      | 180 +++++++++++++++++++++
 datafusion/optimizer/src/lib.rs                    |   1 +
 datafusion/optimizer/src/optimizer.rs              |   2 +
 datafusion/optimizer/src/projection_push_down.rs   |   4 +-
 10 files changed, 228 insertions(+), 6 deletions(-)

diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt
index e2f59dc5c..e78f8e0d9 100644
--- a/benchmarks/expected-plans/q15.txt
+++ b/benchmarks/expected-plans/q15.txt
@@ -4,8 +4,18 @@ Sort: supplier.s_suppkey ASC NULLS LAST
     Inner Join: revenue0.total_revenue = __sq_1.__value
       Inner Join: supplier.s_suppkey = revenue0.supplier_no
         TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone]
-        TableScan: revenue0 projection=[supplier_no, total_revenue]
+        Projection: supplier_no, total_revenue, alias=revenue0
+          Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
+            Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
+              Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
+                Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
+                  TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
       Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1
         Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]
-          TableScan: revenue0 projection=[total_revenue]
+          Projection: total_revenue, alias=revenue0
+            Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
+              Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
+                Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
+                  Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
+                    TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
 EmptyRelation
\ No newline at end of file
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 4e9798df9..a699a234c 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -773,6 +773,10 @@ impl TableProvider for DataFrame {
         self
     }
 
+    fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+        Some(&self.plan)
+    }
+
     fn supports_filter_pushdown(
         &self,
         _filter: &Expr,
@@ -1337,8 +1341,12 @@ mod tests {
         \n  Limit: skip=0, fetch=1\
         \n    Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
         \n      Inner Join: t1.c1 = t2.c1\
-        \n        TableScan: t1 projection=[c1, c2, c3]\
-        \n        TableScan: t2 projection=[c1, c2, c3]",
+        \n        Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\
+        \n          Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
+        \n            TableScan: aggregate_test_100 projection=[c1, c2, c3]\
+        \n        Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\
+        \n          Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
+        \n            TableScan: aggregate_test_100 projection=[c1, c2, c3]",
                    format!("{:?}", df_renamed.to_logical_plan()?)
         );
 
diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs
index 270747347..84111fed0 100644
--- a/datafusion/core/src/datasource/datasource.rs
+++ b/datafusion/core/src/datasource/datasource.rs
@@ -21,6 +21,7 @@ use std::any::Any;
 use std::sync::Arc;
 
 use async_trait::async_trait;
+use datafusion_expr::LogicalPlan;
 pub use datafusion_expr::{TableProviderFilterPushDown, TableType};
 
 use crate::arrow::datatypes::SchemaRef;
@@ -47,6 +48,11 @@ pub trait TableProvider: Sync + Send {
         None
     }
 
+    /// Get the Logical Plan of this table, if available.
+    fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+        None
+    }
+
     /// Create an ExecutionPlan that will scan the table.
     /// The table provider will be usually responsible of grouping
     /// the source data into partitions that can be efficiently
diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/core/src/datasource/default_table_source.rs
index 2e65be0bc..bbb9fbdd6 100644
--- a/datafusion/core/src/datasource/default_table_source.rs
+++ b/datafusion/core/src/datasource/default_table_source.rs
@@ -60,6 +60,10 @@ impl TableSource for DefaultTableSource {
     ) -> datafusion_common::Result<TableProviderFilterPushDown> {
         self.table_provider.supports_filter_pushdown(filter)
     }
+
+    fn get_logical_plan(&self) -> Option<&datafusion_expr::LogicalPlan> {
+        self.table_provider.get_logical_plan()
+    }
 }
 
 /// Wrap TableProvider in TableSource
diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index dfe6d42f8..043ce256b 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -77,6 +77,10 @@ impl TableProvider for ViewTable {
         self
     }
 
+    fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+        Some(&self.logical_plan)
+    }
+
     fn schema(&self) -> SchemaRef {
         Arc::clone(&self.table_schema)
     }
diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs
index 990022bbf..10984f779 100644
--- a/datafusion/expr/src/table_source.rs
+++ b/datafusion/expr/src/table_source.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::Expr;
+use crate::{Expr, LogicalPlan};
 use arrow::datatypes::SchemaRef;
 use std::any::Any;
 
@@ -76,4 +76,9 @@ pub trait TableSource: Sync + Send {
     ) -> datafusion_common::Result<TableProviderFilterPushDown> {
         Ok(TableProviderFilterPushDown::Unsupported)
     }
+
+    /// Get the Logical plan of this table provider, if available.
+    fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+        None
+    }
 }
diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs
new file mode 100644
index 000000000..89c78405a
--- /dev/null
+++ b/datafusion/optimizer/src/inline_table_scan.rs
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to replace TableScan references
+//! such as DataFrames and Views and inlines the LogicalPlan
+//! to support further optimization
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::{
+    logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan,
+};
+
+/// Optimization rule that inlines TableScan that provide a [LogicalPlan]
+/// ([DataFrame] / [ViewTable])
+#[derive(Default)]
+pub struct InlineTableScan;
+
+impl InlineTableScan {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// Inline
+fn inline_table_scan(plan: &LogicalPlan) -> Result<LogicalPlan> {
+    match plan {
+        // Match only on scans without filter / projection / fetch
+        // Views and DataFrames won't have those added
+        // during the early stage of planning
+        LogicalPlan::TableScan(TableScan {
+            source,
+            table_name,
+            filters,
+            fetch: None,
+            ..
+        }) if filters.is_empty() => {
+            if let Some(sub_plan) = source.get_logical_plan() {
+                // Recursively apply optimization
+                let plan = inline_table_scan(sub_plan)?;
+                let plan = LogicalPlanBuilder::from(plan).project_with_alias(
+                    vec![Expr::Wildcard],
+                    Some(table_name.to_string()),
+                )?;
+                plan.build()
+            } else {
+                // No plan available, return with table scan as is
+                Ok(plan.clone())
+            }
+        }
+
+        // Rest: Recurse
+        _ => {
+            // apply the optimization to all inputs of the plan
+            let inputs = plan.inputs();
+            let new_inputs = inputs
+                .iter()
+                .map(|plan| inline_table_scan(plan))
+                .collect::<Result<Vec<_>>>()?;
+
+            from_plan(plan, &plan.expressions(), &new_inputs)
+        }
+    }
+}
+
+impl OptimizerRule for InlineTableScan {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &mut OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        inline_table_scan(plan)
+    }
+
+    fn name(&self) -> &str {
+        "inline_table_scan"
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::{sync::Arc, vec};
+
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource};
+
+    use crate::{inline_table_scan::InlineTableScan, OptimizerConfig, OptimizerRule};
+
+    pub struct RawTableSource {}
+
+    impl TableSource for RawTableSource {
+        fn as_any(&self) -> &dyn std::any::Any {
+            self
+        }
+
+        fn schema(&self) -> arrow::datatypes::SchemaRef {
+            Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
+        }
+
+        fn supports_filter_pushdown(
+            &self,
+            _filter: &datafusion_expr::Expr,
+        ) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
+        {
+            Ok(datafusion_expr::TableProviderFilterPushDown::Inexact)
+        }
+    }
+
+    pub struct CustomSource {
+        plan: LogicalPlan,
+    }
+    impl CustomSource {
+        fn new() -> Self {
+            Self {
+                plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None)
+                    .unwrap()
+                    .build()
+                    .unwrap(),
+            }
+        }
+    }
+    impl TableSource for CustomSource {
+        fn as_any(&self) -> &dyn std::any::Any {
+            self
+        }
+
+        fn supports_filter_pushdown(
+            &self,
+            _filter: &datafusion_expr::Expr,
+        ) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
+        {
+            Ok(datafusion_expr::TableProviderFilterPushDown::Exact)
+        }
+
+        fn schema(&self) -> arrow::datatypes::SchemaRef {
+            Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
+        }
+
+        fn get_logical_plan(&self) -> Option<&LogicalPlan> {
+            Some(&self.plan)
+        }
+    }
+
+    #[test]
+    fn inline_table_scan() {
+        let rule = InlineTableScan::new();
+
+        let source = Arc::new(CustomSource::new());
+
+        let scan = LogicalPlanBuilder::scan("x".to_string(), source, None).unwrap();
+
+        let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap();
+
+        let optimized_plan = rule
+            .optimize(&plan, &mut OptimizerConfig::new())
+            .expect("failed to optimize plan");
+        let formatted_plan = format!("{:?}", optimized_plan);
+        let expected = "\
+        Filter: x.a = Int32(1)\
+        \n  Projection: y.a, alias=x\
+        \n    TableScan: y";
+
+        assert_eq!(formatted_plan, expected);
+        assert_eq!(plan.schema(), optimized_plan.schema());
+    }
+}
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index 23814dcf9..5e8108d67 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -23,6 +23,7 @@ pub mod eliminate_limit;
 pub mod expr_simplifier;
 pub mod filter_null_join_keys;
 pub mod filter_push_down;
+pub mod inline_table_scan;
 pub mod limit_push_down;
 pub mod optimizer;
 pub mod projection_push_down;
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 976131e04..7c37284e6 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -24,6 +24,7 @@ use crate::eliminate_filter::EliminateFilter;
 use crate::eliminate_limit::EliminateLimit;
 use crate::filter_null_join_keys::FilterNullJoinKeys;
 use crate::filter_push_down::FilterPushDown;
+use crate::inline_table_scan::InlineTableScan;
 use crate::limit_push_down::LimitPushDown;
 use crate::projection_push_down::ProjectionPushDown;
 use crate::reduce_cross_join::ReduceCrossJoin;
@@ -148,6 +149,7 @@ impl Optimizer {
     /// Create a new optimizer using the recommended list of rules
     pub fn new(config: &OptimizerConfig) -> Self {
         let mut rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
+            Arc::new(InlineTableScan::new()),
             Arc::new(TypeCoercion::new()),
             Arc::new(SimplifyExpressions::new()),
             Arc::new(UnwrapCastInComparison::new()),
diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs
index f6430b87a..d6ed6e488 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -527,7 +527,9 @@ fn optimize_plan(
 }
 
 fn projection_equal(p: &Projection, p2: &Projection) -> bool {
-    p.expr.len() == p2.expr.len() && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
+    p.expr.len() == p2.expr.len()
+        && p.alias == p2.alias
+        && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
 }
 
 #[cfg(test)]