You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/11/16 11:41:36 UTC
[arrow-datafusion] branch master updated: Extract logical plans in LogicalPlan as independent struct: TableScan (#1290)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 79f129d Extract logical plans in LogicalPlan as independent struct: TableScan (#1290)
79f129d is described below
commit 79f129d048667a4552e44ef740e1b1cf9de306a1
Author: Carlos <wx...@gmail.com>
AuthorDate: Tue Nov 16 19:41:14 2021 +0800
Extract logical plans in LogicalPlan as independent struct: TableScan (#1290)
---
ballista/rust/client/src/context.rs | 13 +++++--
.../rust/core/src/serde/logical_plan/to_proto.rs | 5 ++-
datafusion/src/execution/context.rs | 9 +++--
datafusion/src/logical_plan/builder.rs | 7 ++--
datafusion/src/logical_plan/mod.rs | 2 +-
datafusion/src/logical_plan/plan.rs | 44 ++++++++++++----------
datafusion/src/optimizer/filter_push_down.rs | 13 ++++---
datafusion/src/optimizer/limit_push_down.rs | 9 +++--
datafusion/src/optimizer/projection_push_down.rs | 9 +++--
datafusion/src/physical_plan/planner.rs | 5 ++-
datafusion/tests/custom_sources.rs | 6 +--
datafusion/tests/sql.rs | 5 ++-
12 files changed, 72 insertions(+), 55 deletions(-)
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index e619f12..7eb8a1a 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -30,6 +30,7 @@ use datafusion::dataframe::DataFrame;
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
+use datafusion::logical_plan::plan::TableScanPlan;
use datafusion::logical_plan::LogicalPlan;
use datafusion::prelude::{AvroReadOptions, CsvReadOptions};
use datafusion::sql::parser::FileType;
@@ -212,14 +213,18 @@ impl BallistaContext {
options: CsvReadOptions<'_>,
) -> Result<()> {
match self.read_csv(path, options).await?.to_logical_plan() {
- LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
+ LogicalPlan::TableScan(TableScanPlan { source, .. }) => {
+ self.register_table(name, source)
+ }
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
}
pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
match self.read_parquet(path).await?.to_logical_plan() {
- LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
+ LogicalPlan::TableScan(TableScanPlan { source, .. }) => {
+ self.register_table(name, source)
+ }
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
}
@@ -231,7 +236,9 @@ impl BallistaContext {
options: AvroReadOptions<'_>,
) -> Result<()> {
match self.read_avro(path, options).await?.to_logical_plan() {
- LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
+ LogicalPlan::TableScan(TableScanPlan { source, .. }) => {
+ self.register_table(name, source)
+ }
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
}
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 805fe31..b5ecda2 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -32,6 +32,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingTable;
use datafusion::logical_plan::{
exprlist_to_fields,
+ plan::TableScanPlan,
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Column, Expr, JoinConstraint, JoinType, LogicalPlan,
};
@@ -695,13 +696,13 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
)),
})
}
- LogicalPlan::TableScan {
+ LogicalPlan::TableScan(TableScanPlan {
table_name,
source,
filters,
projection,
..
- } => {
+ }) => {
let schema = source.schema();
let source = source.as_any();
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 0baed7c..b79c4fa 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -1178,6 +1178,7 @@ impl FunctionRegistry for ExecutionContextState {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::logical_plan::plan::TableScanPlan;
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::functions::{make_scalar_function, Volatility};
use crate::physical_plan::{collect, collect_partitioned};
@@ -1421,11 +1422,11 @@ mod tests {
let optimized_plan = ctx.optimize(&logical_plan)?;
match &optimized_plan {
LogicalPlan::Projection { input, .. } => match &**input {
- LogicalPlan::TableScan {
+ LogicalPlan::TableScan(TableScanPlan {
source,
projected_schema,
..
- } => {
+ }) => {
assert_eq!(source.schema().fields().len(), 3);
assert_eq!(projected_schema.fields().len(), 1);
}
@@ -1494,11 +1495,11 @@ mod tests {
let optimized_plan = ctx.optimize(&plan)?;
match &optimized_plan {
LogicalPlan::Projection { input, .. } => match &**input {
- LogicalPlan::TableScan {
+ LogicalPlan::TableScan(TableScanPlan {
source,
projected_schema,
..
- } => {
+ }) => {
assert_eq!(source.schema().fields().len(), 3);
assert_eq!(projected_schema.fields().len(), 1);
}
diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs
index 0c7950c..d9de6fe 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -25,7 +25,7 @@ use crate::datasource::{
MemTable, TableProvider,
};
use crate::error::{DataFusionError, Result};
-use crate::logical_plan::plan::ToStringifiedPlan;
+use crate::logical_plan::plan::{TableScanPlan, ToStringifiedPlan};
use crate::prelude::*;
use crate::scalar::ScalarValue;
use arrow::{
@@ -392,15 +392,14 @@ impl LogicalPlanBuilder {
DFSchema::try_from_qualified_schema(&table_name, &schema)
})?;
- let table_scan = LogicalPlan::TableScan {
+ let table_scan = LogicalPlan::TableScan(TableScanPlan {
table_name,
source: provider,
projected_schema: Arc::new(projected_schema),
projection,
filters,
limit: None,
- };
-
+ });
Ok(Self::from(table_scan))
}
/// Wrap a plan in a window
diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs
index 8569b35..5db6a99 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -27,7 +27,7 @@ mod display;
mod expr;
mod extension;
mod operators;
-mod plan;
+pub mod plan;
mod registry;
pub mod window_frames;
pub use builder::{
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index d1e1678..cec0e09 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -57,6 +57,23 @@ pub enum JoinConstraint {
Using,
}
+/// Produces rows from a table provider by reference or from the context
+#[derive(Clone)]
+pub struct TableScanPlan {
+ /// The name of the table
+ pub table_name: String,
+ /// The source of the table
+ pub source: Arc<dyn TableProvider>,
+ /// Optional column indices to use as a projection
+ pub projection: Option<Vec<usize>>,
+ /// The schema description of the output
+ pub projected_schema: DFSchemaRef,
+ /// Optional expressions to be used as filters by the table provider
+ pub filters: Vec<Expr>,
+ /// Optional limit to skip reading
+ pub limit: Option<usize>,
+}
+
/// A LogicalPlan represents the different types of relational
/// operators (such as Projection, Filter, etc) and can be created by
/// the SQL query planner and the DataFrame API.
@@ -164,20 +181,7 @@ pub enum LogicalPlan {
alias: Option<String>,
},
/// Produces rows from a table provider by reference or from the context
- TableScan {
- /// The name of the table
- table_name: String,
- /// The source of the table
- source: Arc<dyn TableProvider>,
- /// Optional column indices to use as a projection
- projection: Option<Vec<usize>>,
- /// The schema description of the output
- projected_schema: DFSchemaRef,
- /// Optional expressions to be used as filters by the table provider
- filters: Vec<Expr>,
- /// Optional limit to skip reading
- limit: Option<usize>,
- },
+ TableScan(TableScanPlan),
/// Produces no rows: An empty relation with an empty schema
EmptyRelation {
/// Whether to produce a placeholder row
@@ -265,9 +269,9 @@ impl LogicalPlan {
match self {
LogicalPlan::EmptyRelation { schema, .. } => schema,
LogicalPlan::Values { schema, .. } => schema,
- LogicalPlan::TableScan {
+ LogicalPlan::TableScan(TableScanPlan {
projected_schema, ..
- } => projected_schema,
+ }) => projected_schema,
LogicalPlan::Projection { schema, .. } => schema,
LogicalPlan::Filter { input, .. } => input.schema(),
LogicalPlan::Window { schema, .. } => schema,
@@ -290,9 +294,9 @@ impl LogicalPlan {
/// Get a vector of references to all schemas in every node of the logical plan
pub fn all_schemas(&self) -> Vec<&DFSchemaRef> {
match self {
- LogicalPlan::TableScan {
+ LogicalPlan::TableScan(TableScanPlan {
projected_schema, ..
- } => vec![projected_schema],
+ }) => vec![projected_schema],
LogicalPlan::Values { schema, .. } => vec![schema],
LogicalPlan::Window { input, schema, .. }
| LogicalPlan::Aggregate { input, schema, .. }
@@ -765,13 +769,13 @@ impl LogicalPlan {
write!(f, "Values: {}{}", str_values.join(", "), elipse)
}
- LogicalPlan::TableScan {
+ LogicalPlan::TableScan(TableScanPlan {
ref table_name,
ref projection,
ref filters,
ref limit,
..
- } => {
+ }) => {
write!(
f,
"TableScan: {} projection={:?}",
diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index 843d4e0..a2bf6f8 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -16,6 +16,7 @@
use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::TableScanPlan;
use crate::logical_plan::{and, replace_col, Column, LogicalPlan};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
@@ -451,14 +452,14 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
optimize_join(state, plan, left, right)
}
- LogicalPlan::TableScan {
+ LogicalPlan::TableScan(TableScanPlan {
source,
projected_schema,
filters,
projection,
table_name,
limit,
- } => {
+ }) => {
let mut used_columns = HashSet::new();
let mut new_filters = filters.clone();
@@ -487,14 +488,14 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
issue_filters(
state,
used_columns,
- &LogicalPlan::TableScan {
+ &LogicalPlan::TableScan(TableScanPlan {
source: source.clone(),
projection: projection.clone(),
projected_schema: projected_schema.clone(),
table_name: table_name.clone(),
filters: new_filters,
limit: *limit,
- },
+ }),
)
}
_ => {
@@ -1174,7 +1175,7 @@ mod tests {
) -> Result<LogicalPlan> {
let test_provider = PushDownProvider { filter_support };
- let table_scan = LogicalPlan::TableScan {
+ let table_scan = LogicalPlan::TableScan(TableScanPlan {
table_name: "test".to_string(),
filters: vec![],
projected_schema: Arc::new(DFSchema::try_from(
@@ -1183,7 +1184,7 @@ mod tests {
projection: None,
source: Arc::new(test_provider),
limit: None,
- };
+ });
LogicalPlanBuilder::from(table_scan)
.filter(col("a").eq(lit(1i64)))?
diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs
index d02777c..ff8b8c1 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -20,6 +20,7 @@
use super::utils;
use crate::error::Result;
use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::TableScanPlan;
use crate::logical_plan::LogicalPlan;
use crate::optimizer::optimizer::OptimizerRule;
use std::sync::Arc;
@@ -56,16 +57,16 @@ fn limit_push_down(
})
}
(
- LogicalPlan::TableScan {
+ LogicalPlan::TableScan(TableScanPlan {
table_name,
source,
projection,
filters,
limit,
projected_schema,
- },
+ }),
Some(upper_limit),
- ) => Ok(LogicalPlan::TableScan {
+ ) => Ok(LogicalPlan::TableScan(TableScanPlan {
table_name: table_name.clone(),
source: source.clone(),
projection: projection.clone(),
@@ -74,7 +75,7 @@ fn limit_push_down(
.map(|x| std::cmp::min(x, upper_limit))
.or(Some(upper_limit)),
projected_schema: projected_schema.clone(),
- }),
+ })),
(
LogicalPlan::Projection {
expr,
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index 7c087a1..0b2decc 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -20,6 +20,7 @@
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionProps;
+use crate::logical_plan::plan::TableScanPlan;
use crate::logical_plan::{
build_join_schema, Column, DFField, DFSchema, DFSchemaRef, LogicalPlan,
LogicalPlanBuilder, ToDFSchema,
@@ -328,13 +329,13 @@ fn optimize_plan(
}
// scans:
// * remove un-used columns from the scan projection
- LogicalPlan::TableScan {
+ LogicalPlan::TableScan(TableScanPlan {
table_name,
source,
filters,
limit,
..
- } => {
+ }) => {
let (projection, projected_schema) = get_projected_schema(
Some(table_name),
&source.schema(),
@@ -342,14 +343,14 @@ fn optimize_plan(
has_projection,
)?;
// return the table scan with projection
- Ok(LogicalPlan::TableScan {
+ Ok(LogicalPlan::TableScan(TableScanPlan {
table_name: table_name.clone(),
source: source.clone(),
projection: Some(projection),
projected_schema,
filters: filters.clone(),
limit: *limit,
- })
+ }))
}
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 402f119..dc7cf25 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -23,6 +23,7 @@ use super::{
hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows,
};
use crate::execution::context::ExecutionContextState;
+use crate::logical_plan::plan::TableScanPlan;
use crate::logical_plan::{
unnormalize_cols, DFSchema, Expr, LogicalPlan, Operator,
Partitioning as LogicalPartitioning, PlanType, ToStringifiedPlan,
@@ -333,13 +334,13 @@ impl DefaultPhysicalPlanner {
let batch_size = ctx_state.config.batch_size;
let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan {
- LogicalPlan::TableScan {
+ LogicalPlan::TableScan (TableScanPlan {
source,
projection,
filters,
limit,
..
- } => {
+ }) => {
// Remove all qualifiers from the scan as the provider
// doesn't know (nor should care) how the relation was
// referred to in the query
diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs
index a29a265..de44b82 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -31,7 +31,7 @@ use datafusion::{
use datafusion::execution::context::ExecutionContext;
use datafusion::logical_plan::{
- col, Expr, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
+ col, plan::TableScanPlan, Expr, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
};
use datafusion::physical_plan::{
ColumnStatistics, ExecutionPlan, Partitioning, RecordBatchStream,
@@ -217,11 +217,11 @@ async fn custom_source_dataframe() -> Result<()> {
let optimized_plan = ctx.optimize(&logical_plan)?;
match &optimized_plan {
LogicalPlan::Projection { input, .. } => match &**input {
- LogicalPlan::TableScan {
+ LogicalPlan::TableScan(TableScanPlan {
source,
projected_schema,
..
- } => {
+ }) => {
assert_eq!(source.schema().fields().len(), 2);
assert_eq!(projected_schema.fields().len(), 1);
}
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 15241ee..b06b170 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -36,6 +36,7 @@ use datafusion::assert_batches_eq;
use datafusion::assert_batches_sorted_eq;
use datafusion::assert_contains;
use datafusion::assert_not_contains;
+use datafusion::logical_plan::plan::TableScanPlan;
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::functions::Volatility;
use datafusion::physical_plan::metrics::MetricValue;
@@ -92,10 +93,10 @@ async fn nyc() -> Result<()> {
match &optimized_plan {
LogicalPlan::Projection { input, .. } => match input.as_ref() {
LogicalPlan::Aggregate { input, .. } => match input.as_ref() {
- LogicalPlan::TableScan {
+ LogicalPlan::TableScan(TableScanPlan {
ref projected_schema,
..
- } => {
+ }) => {
assert_eq!(2, projected_schema.fields().len());
assert_eq!(projected_schema.field(0).name(), "passenger_count");
assert_eq!(projected_schema.field(1).name(), "fare_amount");