You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/11/16 11:41:36 UTC

[arrow-datafusion] branch master updated: Extract logical plans in LogicalPlan as independent struct: TableScan (#1290)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 79f129d  Extract logical plans in LogicalPlan as independent struct: TableScan (#1290)
79f129d is described below

commit 79f129d048667a4552e44ef740e1b1cf9de306a1
Author: Carlos <wx...@gmail.com>
AuthorDate: Tue Nov 16 19:41:14 2021 +0800

    Extract logical plans in LogicalPlan as independent struct: TableScan (#1290)
---
 ballista/rust/client/src/context.rs                | 13 +++++--
 .../rust/core/src/serde/logical_plan/to_proto.rs   |  5 ++-
 datafusion/src/execution/context.rs                |  9 +++--
 datafusion/src/logical_plan/builder.rs             |  7 ++--
 datafusion/src/logical_plan/mod.rs                 |  2 +-
 datafusion/src/logical_plan/plan.rs                | 44 ++++++++++++----------
 datafusion/src/optimizer/filter_push_down.rs       | 13 ++++---
 datafusion/src/optimizer/limit_push_down.rs        |  9 +++--
 datafusion/src/optimizer/projection_push_down.rs   |  9 +++--
 datafusion/src/physical_plan/planner.rs            |  5 ++-
 datafusion/tests/custom_sources.rs                 |  6 +--
 datafusion/tests/sql.rs                            |  5 ++-
 12 files changed, 72 insertions(+), 55 deletions(-)

diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index e619f12..7eb8a1a 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -30,6 +30,7 @@ use datafusion::dataframe::DataFrame;
 use datafusion::datasource::TableProvider;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::dataframe_impl::DataFrameImpl;
+use datafusion::logical_plan::plan::TableScanPlan;
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::prelude::{AvroReadOptions, CsvReadOptions};
 use datafusion::sql::parser::FileType;
@@ -212,14 +213,18 @@ impl BallistaContext {
         options: CsvReadOptions<'_>,
     ) -> Result<()> {
         match self.read_csv(path, options).await?.to_logical_plan() {
-            LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
+            LogicalPlan::TableScan(TableScanPlan { source, .. }) => {
+                self.register_table(name, source)
+            }
             _ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
         }
     }
 
     pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
         match self.read_parquet(path).await?.to_logical_plan() {
-            LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
+            LogicalPlan::TableScan(TableScanPlan { source, .. }) => {
+                self.register_table(name, source)
+            }
             _ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
         }
     }
@@ -231,7 +236,9 @@ impl BallistaContext {
         options: AvroReadOptions<'_>,
     ) -> Result<()> {
         match self.read_avro(path, options).await?.to_logical_plan() {
-            LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
+            LogicalPlan::TableScan(TableScanPlan { source, .. }) => {
+                self.register_table(name, source)
+            }
             _ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
         }
     }
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 805fe31..b5ecda2 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -32,6 +32,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::listing::ListingTable;
 use datafusion::logical_plan::{
     exprlist_to_fields,
+    plan::TableScanPlan,
     window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
     Column, Expr, JoinConstraint, JoinType, LogicalPlan,
 };
@@ -695,13 +696,13 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     )),
                 })
             }
-            LogicalPlan::TableScan {
+            LogicalPlan::TableScan(TableScanPlan {
                 table_name,
                 source,
                 filters,
                 projection,
                 ..
-            } => {
+            }) => {
                 let schema = source.schema();
                 let source = source.as_any();
 
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 0baed7c..b79c4fa 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -1178,6 +1178,7 @@ impl FunctionRegistry for ExecutionContextState {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::logical_plan::plan::TableScanPlan;
     use crate::logical_plan::{binary_expr, lit, Operator};
     use crate::physical_plan::functions::{make_scalar_function, Volatility};
     use crate::physical_plan::{collect, collect_partitioned};
@@ -1421,11 +1422,11 @@ mod tests {
         let optimized_plan = ctx.optimize(&logical_plan)?;
         match &optimized_plan {
             LogicalPlan::Projection { input, .. } => match &**input {
-                LogicalPlan::TableScan {
+                LogicalPlan::TableScan(TableScanPlan {
                     source,
                     projected_schema,
                     ..
-                } => {
+                }) => {
                     assert_eq!(source.schema().fields().len(), 3);
                     assert_eq!(projected_schema.fields().len(), 1);
                 }
@@ -1494,11 +1495,11 @@ mod tests {
         let optimized_plan = ctx.optimize(&plan)?;
         match &optimized_plan {
             LogicalPlan::Projection { input, .. } => match &**input {
-                LogicalPlan::TableScan {
+                LogicalPlan::TableScan(TableScanPlan {
                     source,
                     projected_schema,
                     ..
-                } => {
+                }) => {
                     assert_eq!(source.schema().fields().len(), 3);
                     assert_eq!(projected_schema.fields().len(), 1);
                 }
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 0c7950c..d9de6fe 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -25,7 +25,7 @@ use crate::datasource::{
     MemTable, TableProvider,
 };
 use crate::error::{DataFusionError, Result};
-use crate::logical_plan::plan::ToStringifiedPlan;
+use crate::logical_plan::plan::{TableScanPlan, ToStringifiedPlan};
 use crate::prelude::*;
 use crate::scalar::ScalarValue;
 use arrow::{
@@ -392,15 +392,14 @@ impl LogicalPlanBuilder {
                 DFSchema::try_from_qualified_schema(&table_name, &schema)
             })?;
 
-        let table_scan = LogicalPlan::TableScan {
+        let table_scan = LogicalPlan::TableScan(TableScanPlan {
             table_name,
             source: provider,
             projected_schema: Arc::new(projected_schema),
             projection,
             filters,
             limit: None,
-        };
-
+        });
         Ok(Self::from(table_scan))
     }
     /// Wrap a plan in a window
diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs
index 8569b35..5db6a99 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -27,7 +27,7 @@ mod display;
 mod expr;
 mod extension;
 mod operators;
-mod plan;
+pub mod plan;
 mod registry;
 pub mod window_frames;
 pub use builder::{
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index d1e1678..cec0e09 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -57,6 +57,23 @@ pub enum JoinConstraint {
     Using,
 }
 
+/// Produces rows from a table provider by reference or from the context
+#[derive(Clone)]
+pub struct TableScanPlan {
+    /// The name of the table
+    pub table_name: String,
+    /// The source of the table
+    pub source: Arc<dyn TableProvider>,
+    /// Optional column indices to use as a projection
+    pub projection: Option<Vec<usize>>,
+    /// The schema description of the output
+    pub projected_schema: DFSchemaRef,
+    /// Optional expressions to be used as filters by the table provider
+    pub filters: Vec<Expr>,
+    /// Optional limit to skip reading
+    pub limit: Option<usize>,
+}
+
 /// A LogicalPlan represents the different types of relational
 /// operators (such as Projection, Filter, etc) and can be created by
 /// the SQL query planner and the DataFrame API.
@@ -164,20 +181,7 @@ pub enum LogicalPlan {
         alias: Option<String>,
     },
     /// Produces rows from a table provider by reference or from the context
-    TableScan {
-        /// The name of the table
-        table_name: String,
-        /// The source of the table
-        source: Arc<dyn TableProvider>,
-        /// Optional column indices to use as a projection
-        projection: Option<Vec<usize>>,
-        /// The schema description of the output
-        projected_schema: DFSchemaRef,
-        /// Optional expressions to be used as filters by the table provider
-        filters: Vec<Expr>,
-        /// Optional limit to skip reading
-        limit: Option<usize>,
-    },
+    TableScan(TableScanPlan),
     /// Produces no rows: An empty relation with an empty schema
     EmptyRelation {
         /// Whether to produce a placeholder row
@@ -265,9 +269,9 @@ impl LogicalPlan {
         match self {
             LogicalPlan::EmptyRelation { schema, .. } => schema,
             LogicalPlan::Values { schema, .. } => schema,
-            LogicalPlan::TableScan {
+            LogicalPlan::TableScan(TableScanPlan {
                 projected_schema, ..
-            } => projected_schema,
+            }) => projected_schema,
             LogicalPlan::Projection { schema, .. } => schema,
             LogicalPlan::Filter { input, .. } => input.schema(),
             LogicalPlan::Window { schema, .. } => schema,
@@ -290,9 +294,9 @@ impl LogicalPlan {
     /// Get a vector of references to all schemas in every node of the logical plan
     pub fn all_schemas(&self) -> Vec<&DFSchemaRef> {
         match self {
-            LogicalPlan::TableScan {
+            LogicalPlan::TableScan(TableScanPlan {
                 projected_schema, ..
-            } => vec![projected_schema],
+            }) => vec![projected_schema],
             LogicalPlan::Values { schema, .. } => vec![schema],
             LogicalPlan::Window { input, schema, .. }
             | LogicalPlan::Aggregate { input, schema, .. }
@@ -765,13 +769,13 @@ impl LogicalPlan {
                         write!(f, "Values: {}{}", str_values.join(", "), elipse)
                     }
 
-                    LogicalPlan::TableScan {
+                    LogicalPlan::TableScan(TableScanPlan {
                         ref table_name,
                         ref projection,
                         ref filters,
                         ref limit,
                         ..
-                    } => {
+                    }) => {
                         write!(
                             f,
                             "TableScan: {} projection={:?}",
diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index 843d4e0..a2bf6f8 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -16,6 +16,7 @@
 
 use crate::datasource::datasource::TableProviderFilterPushDown;
 use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::TableScanPlan;
 use crate::logical_plan::{and, replace_col, Column, LogicalPlan};
 use crate::logical_plan::{DFSchema, Expr};
 use crate::optimizer::optimizer::OptimizerRule;
@@ -451,14 +452,14 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
 
             optimize_join(state, plan, left, right)
         }
-        LogicalPlan::TableScan {
+        LogicalPlan::TableScan(TableScanPlan {
             source,
             projected_schema,
             filters,
             projection,
             table_name,
             limit,
-        } => {
+        }) => {
             let mut used_columns = HashSet::new();
             let mut new_filters = filters.clone();
 
@@ -487,14 +488,14 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
             issue_filters(
                 state,
                 used_columns,
-                &LogicalPlan::TableScan {
+                &LogicalPlan::TableScan(TableScanPlan {
                     source: source.clone(),
                     projection: projection.clone(),
                     projected_schema: projected_schema.clone(),
                     table_name: table_name.clone(),
                     filters: new_filters,
                     limit: *limit,
-                },
+                }),
             )
         }
         _ => {
@@ -1174,7 +1175,7 @@ mod tests {
     ) -> Result<LogicalPlan> {
         let test_provider = PushDownProvider { filter_support };
 
-        let table_scan = LogicalPlan::TableScan {
+        let table_scan = LogicalPlan::TableScan(TableScanPlan {
             table_name: "test".to_string(),
             filters: vec![],
             projected_schema: Arc::new(DFSchema::try_from(
@@ -1183,7 +1184,7 @@ mod tests {
             projection: None,
             source: Arc::new(test_provider),
             limit: None,
-        };
+        });
 
         LogicalPlanBuilder::from(table_scan)
             .filter(col("a").eq(lit(1i64)))?
diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs
index d02777c..ff8b8c1 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -20,6 +20,7 @@
 use super::utils;
 use crate::error::Result;
 use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::TableScanPlan;
 use crate::logical_plan::LogicalPlan;
 use crate::optimizer::optimizer::OptimizerRule;
 use std::sync::Arc;
@@ -56,16 +57,16 @@ fn limit_push_down(
             })
         }
         (
-            LogicalPlan::TableScan {
+            LogicalPlan::TableScan(TableScanPlan {
                 table_name,
                 source,
                 projection,
                 filters,
                 limit,
                 projected_schema,
-            },
+            }),
             Some(upper_limit),
-        ) => Ok(LogicalPlan::TableScan {
+        ) => Ok(LogicalPlan::TableScan(TableScanPlan {
             table_name: table_name.clone(),
             source: source.clone(),
             projection: projection.clone(),
@@ -74,7 +75,7 @@ fn limit_push_down(
                 .map(|x| std::cmp::min(x, upper_limit))
                 .or(Some(upper_limit)),
             projected_schema: projected_schema.clone(),
-        }),
+        })),
         (
             LogicalPlan::Projection {
                 expr,
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index 7c087a1..0b2decc 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -20,6 +20,7 @@
 
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::TableScanPlan;
 use crate::logical_plan::{
     build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
     LogicalPlanBuilder, ToDFSchema,
@@ -328,13 +329,13 @@ fn optimize_plan(
         }
         // scans:
         // * remove un-used columns from the scan projection
-        LogicalPlan::TableScan {
+        LogicalPlan::TableScan(TableScanPlan {
             table_name,
             source,
             filters,
             limit,
             ..
-        } => {
+        }) => {
             let (projection, projected_schema) = get_projected_schema(
                 Some(table_name),
                 &source.schema(),
@@ -342,14 +343,14 @@ fn optimize_plan(
                 has_projection,
             )?;
             // return the table scan with projection
-            Ok(LogicalPlan::TableScan {
+            Ok(LogicalPlan::TableScan(TableScanPlan {
                 table_name: table_name.clone(),
                 source: source.clone(),
                 projection: Some(projection),
                 projected_schema,
                 filters: filters.clone(),
                 limit: *limit,
-            })
+            }))
         }
         LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
             "Unsupported logical plan: Explain must be root of the plan".to_string(),
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 402f119..dc7cf25 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -23,6 +23,7 @@ use super::{
     hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows,
 };
 use crate::execution::context::ExecutionContextState;
+use crate::logical_plan::plan::TableScanPlan;
 use crate::logical_plan::{
     unnormalize_cols, DFSchema, Expr, LogicalPlan, Operator,
     Partitioning as LogicalPartitioning, PlanType, ToStringifiedPlan,
@@ -333,13 +334,13 @@ impl DefaultPhysicalPlanner {
             let batch_size = ctx_state.config.batch_size;
 
             let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan {
-                LogicalPlan::TableScan {
+                LogicalPlan::TableScan (TableScanPlan {
                     source,
                     projection,
                     filters,
                     limit,
                     ..
-                } => {
+                }) => {
                     // Remove all qualifiers from the scan as the provider
                     // doesn't know (nor should care) how the relation was
                     // referred to in the query
diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs
index a29a265..de44b82 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -31,7 +31,7 @@ use datafusion::{
 
 use datafusion::execution::context::ExecutionContext;
 use datafusion::logical_plan::{
-    col, Expr, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
+    col, plan::TableScanPlan, Expr, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
 };
 use datafusion::physical_plan::{
     ColumnStatistics, ExecutionPlan, Partitioning, RecordBatchStream,
@@ -217,11 +217,11 @@ async fn custom_source_dataframe() -> Result<()> {
     let optimized_plan = ctx.optimize(&logical_plan)?;
     match &optimized_plan {
         LogicalPlan::Projection { input, .. } => match &**input {
-            LogicalPlan::TableScan {
+            LogicalPlan::TableScan(TableScanPlan {
                 source,
                 projected_schema,
                 ..
-            } => {
+            }) => {
                 assert_eq!(source.schema().fields().len(), 2);
                 assert_eq!(projected_schema.fields().len(), 1);
             }
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 15241ee..b06b170 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -36,6 +36,7 @@ use datafusion::assert_batches_eq;
 use datafusion::assert_batches_sorted_eq;
 use datafusion::assert_contains;
 use datafusion::assert_not_contains;
+use datafusion::logical_plan::plan::TableScanPlan;
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::physical_plan::functions::Volatility;
 use datafusion::physical_plan::metrics::MetricValue;
@@ -92,10 +93,10 @@ async fn nyc() -> Result<()> {
     match &optimized_plan {
         LogicalPlan::Projection { input, .. } => match input.as_ref() {
             LogicalPlan::Aggregate { input, .. } => match input.as_ref() {
-                LogicalPlan::TableScan {
+                LogicalPlan::TableScan(TableScanPlan {
                     ref projected_schema,
                     ..
-                } => {
+                }) => {
                     assert_eq!(2, projected_schema.fields().len());
                     assert_eq!(projected_schema.field(0).name(), "passenger_count");
                     assert_eq!(projected_schema.field(1).name(), "fare_amount");