You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/11/18 12:12:09 UTC
[arrow-datafusion] branch master updated: Extract CreateMemoryTable, DropTable, CreateExternalTable in LogicalPlan as independent struct (#1311)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b8ff94c Extract CreateMemoryTable, DropTable, CreateExternalTable in LogicalPlan as independent struct (#1311)
b8ff94c is described below
commit b8ff94ce9ebdf861c595c06a78860b5e1dbf2d33
Author: Kun Liu <li...@apache.org>
AuthorDate: Thu Nov 18 20:12:04 2021 +0800
Extract CreateMemoryTable, DropTable, CreateExternalTable in LogicalPlan as independent struct (#1311)
* refactor the plan for: table ddl
* fix the conflict for import
---
ballista/rust/client/src/context.rs | 7 +-
.../rust/core/src/serde/logical_plan/from_proto.rs | 8 +-
ballista/rust/core/src/serde/logical_plan/mod.rs | 18 ++--
.../rust/core/src/serde/logical_plan/to_proto.rs | 12 +--
ballista/rust/core/src/utils.rs | 2 +-
datafusion/src/execution/context.rs | 13 +--
datafusion/src/logical_plan/mod.rs | 3 +-
datafusion/src/logical_plan/plan.rs | 117 +++++++++++++--------
.../src/optimizer/common_subexpr_eliminate.rs | 6 +-
datafusion/src/optimizer/constant_folding.rs | 6 +-
datafusion/src/optimizer/filter_push_down.rs | 3 +-
datafusion/src/optimizer/limit_push_down.rs | 2 +-
datafusion/src/optimizer/projection_push_down.rs | 6 +-
datafusion/src/optimizer/utils.rs | 15 +--
datafusion/src/physical_plan/planner.rs | 6 +-
datafusion/src/sql/planner.rs | 16 +--
datafusion/tests/custom_sources.rs | 2 +-
datafusion/tests/sql.rs | 2 +-
18 files changed, 137 insertions(+), 107 deletions(-)
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 7eb8a1a..00a92de 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -30,8 +30,7 @@ use datafusion::dataframe::DataFrame;
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
-use datafusion::logical_plan::plan::TableScanPlan;
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScanPlan};
use datafusion::prelude::{AvroReadOptions, CsvReadOptions};
use datafusion::sql::parser::FileType;
@@ -270,13 +269,13 @@ impl BallistaContext {
let plan = ctx.create_logical_plan(sql)?;
match plan {
- LogicalPlan::CreateExternalTable {
+ LogicalPlan::CreateExternalTable(CreateExternalTable {
ref schema,
ref name,
ref location,
ref file_type,
ref has_header,
- } => match file_type {
+ }) => match file_type {
FileType::CSV => {
self.register_csv(
name,
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 66bec78..ba40488 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -33,8 +33,8 @@ use datafusion::logical_plan::window_frames::{
};
use datafusion::logical_plan::{
abs, acos, asin, atan, ceil, cos, digest, exp, floor, ln, log10, log2, round, signum,
- sin, sqrt, tan, trunc, Column, DFField, DFSchema, Expr, JoinConstraint, JoinType,
- LogicalPlan, LogicalPlanBuilder, Operator,
+ sin, sqrt, tan, trunc, Column, CreateExternalTable, DFField, DFSchema, Expr,
+ JoinConstraint, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
@@ -271,13 +271,13 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
let pb_file_type: protobuf::FileType =
create_extern_table.file_type.try_into()?;
- Ok(LogicalPlan::CreateExternalTable {
+ Ok(LogicalPlan::CreateExternalTable(CreateExternalTable {
schema: pb_schema.try_into()?,
name: create_extern_table.name.clone(),
location: create_extern_table.location.clone(),
file_type: pb_file_type.into(),
has_header: create_extern_table.has_header,
- })
+ }))
}
LogicalPlanType::Analyze(analyze) => {
let input: LogicalPlan = convert_box_required!(analyze.input)?;
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 09bcf1f..472399b 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -28,7 +28,8 @@ mod roundtrip_tests {
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
datasource::object_store::local::LocalFileSystem,
logical_plan::{
- col, Expr, LogicalPlan, LogicalPlanBuilder, Partitioning, ToDFSchema,
+ col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder,
+ Partitioning, ToDFSchema,
},
physical_plan::functions::BuiltinScalarFunction::Sqrt,
prelude::*,
@@ -655,13 +656,14 @@ mod roundtrip_tests {
];
for file in filetypes.iter() {
- let create_table_node = LogicalPlan::CreateExternalTable {
- schema: df_schema_ref.clone(),
- name: String::from("TestName"),
- location: String::from("employee.csv"),
- file_type: *file,
- has_header: true,
- };
+ let create_table_node =
+ LogicalPlan::CreateExternalTable(CreateExternalTable {
+ schema: df_schema_ref.clone(),
+ name: String::from("TestName"),
+ location: String::from("employee.csv"),
+ file_type: *file,
+ has_header: true,
+ });
roundtrip_test!(create_table_node);
}
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index da36c9f..825a4b9 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -32,9 +32,9 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingTable;
use datafusion::logical_plan::{
exprlist_to_fields,
- plan::TableScanPlan,
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
- Column, Expr, JoinConstraint, JoinType, LogicalPlan,
+ Column, CreateExternalTable, Expr, JoinConstraint, JoinType, LogicalPlan,
+ TableScanPlan,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::functions::BuiltinScalarFunction;
@@ -945,13 +945,13 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
},
)),
}),
- LogicalPlan::CreateExternalTable {
+ LogicalPlan::CreateExternalTable(CreateExternalTable {
name,
location,
file_type,
has_header,
schema: df_schema,
- } => {
+ }) => {
use datafusion::sql::parser::FileType;
let pb_file_type: protobuf::FileType = match file_type {
@@ -1009,10 +1009,10 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
- LogicalPlan::CreateMemoryTable { .. } => Err(proto_error(
+ LogicalPlan::CreateMemoryTable(_) => Err(proto_error(
"Error converting CreateMemoryTable. Not yet supported in Ballista",
)),
- LogicalPlan::DropTable { .. } => Err(proto_error(
+ LogicalPlan::DropTable(_) => Err(proto_error(
"Error converting DropTable. Not yet supported in Ballista",
)),
}
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 80391b3..2dfdb3d 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -277,7 +277,7 @@ impl QueryPlanner for BallistaQueryPlanner {
_ctx_state: &ExecutionContextState,
) -> std::result::Result<Arc<dyn ExecutionPlan>, DataFusionError> {
match logical_plan {
- LogicalPlan::CreateExternalTable { .. } => {
+ LogicalPlan::CreateExternalTable(_) => {
// table state is managed locally in the BallistaContext, not in the scheduler
Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))))
}
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 08e2e70..52c5160 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -63,7 +63,8 @@ use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::execution::dataframe_impl::DataFrameImpl;
use crate::logical_plan::{
- FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
+ CreateExternalTable, CreateMemoryTable, DropTable, FunctionRegistry, LogicalPlan,
+ LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::optimizer::constant_folding::ConstantFolding;
@@ -191,13 +192,13 @@ impl ExecutionContext {
pub async fn sql(&mut self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let plan = self.create_logical_plan(sql)?;
match plan {
- LogicalPlan::CreateExternalTable {
+ LogicalPlan::CreateExternalTable(CreateExternalTable {
ref schema,
ref name,
ref location,
ref file_type,
ref has_header,
- } => {
+ }) => {
let file_format = match file_type {
FileType::CSV => {
Ok(Arc::new(CsvFormat::default().with_has_header(*has_header))
@@ -241,7 +242,7 @@ impl ExecutionContext {
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
- LogicalPlan::CreateMemoryTable { input, name } => {
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, input }) => {
let plan = self.optimize(&input)?;
let physical = Arc::new(DataFrameImpl::new(self.state.clone(), &plan));
@@ -256,7 +257,7 @@ impl ExecutionContext {
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
- LogicalPlan::DropTable { name, if_exist, .. } => {
+ LogicalPlan::DropTable(DropTable { name, if_exist, .. }) => {
let returned = self.deregister_table(name.as_str())?;
if !if_exist && returned.is_none() {
Err(DataFusionError::Execution(format!(
@@ -1174,7 +1175,7 @@ impl FunctionRegistry for ExecutionContextState {
#[cfg(test)]
mod tests {
use super::*;
- use crate::logical_plan::plan::TableScanPlan;
+ use crate::logical_plan::TableScanPlan;
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::functions::{make_scalar_function, Volatility};
use crate::physical_plan::{collect, collect_partitioned};
diff --git a/datafusion/src/logical_plan/mod.rs b/datafusion/src/logical_plan/mod.rs
index 5db6a99..77193eb 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -51,7 +51,8 @@ pub use expr::{
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{
- JoinConstraint, JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor,
+ CreateExternalTable, CreateMemoryTable, DropTable, JoinConstraint, JoinType,
+ LogicalPlan, Partitioning, PlanType, PlanVisitor, TableScanPlan,
};
pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
pub use registry::FunctionRegistry;
diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs
index d9f3d9a..ea56bc0 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -74,6 +74,41 @@ pub struct TableScanPlan {
pub limit: Option<usize>,
}
+/// Creates an in memory table.
+#[derive(Clone)]
+pub struct CreateMemoryTable {
+ /// The table name
+ pub name: String,
+ /// The logical plan
+ pub input: Arc<LogicalPlan>,
+}
+
+/// Creates an external table.
+#[derive(Clone)]
+pub struct CreateExternalTable {
+ /// The table schema
+ pub schema: DFSchemaRef,
+ /// The table name
+ pub name: String,
+ /// The physical location
+ pub location: String,
+ /// The file type of physical file
+ pub file_type: FileType,
+ /// Whether the CSV file contains a header
+ pub has_header: bool,
+}
+
+/// Drops a table.
+#[derive(Clone)]
+pub struct DropTable {
+ /// The table name
+ pub name: String,
+ /// If the table exists
+ pub if_exist: bool,
+ /// Dummy schema
+ pub schema: DFSchemaRef,
+}
+
/// Produces a relation with string representations of
/// various parts of the plan
#[derive(Clone)]
@@ -230,34 +265,11 @@ pub enum LogicalPlan {
input: Arc<LogicalPlan>,
},
/// Creates an external table.
- CreateExternalTable {
- /// The table schema
- schema: DFSchemaRef,
- /// The table name
- name: String,
- /// The physical location
- location: String,
- /// The file type of physical file
- file_type: FileType,
- /// Whether the CSV file contains a header
- has_header: bool,
- },
+ CreateExternalTable(CreateExternalTable),
/// Creates an in memory table.
- CreateMemoryTable {
- /// The table name
- name: String,
- /// The logical plan
- input: Arc<LogicalPlan>,
- },
+ CreateMemoryTable(CreateMemoryTable),
/// Drops a table.
- DropTable {
- /// The table name
- name: String,
- /// If the table exists
- if_exist: bool,
- /// Dummy schema
- schema: DFSchemaRef,
- },
+ DropTable(DropTable),
/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
@@ -295,13 +307,17 @@ impl LogicalPlan {
LogicalPlan::CrossJoin { schema, .. } => schema,
LogicalPlan::Repartition { input, .. } => input.schema(),
LogicalPlan::Limit { input, .. } => input.schema(),
- LogicalPlan::CreateExternalTable { schema, .. } => schema,
+ LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
+ schema
+ }
LogicalPlan::Explain(explain) => &explain.schema,
LogicalPlan::Analyze(analyze) => &analyze.schema,
LogicalPlan::Extension(extension) => extension.node.schema(),
LogicalPlan::Union { schema, .. } => schema,
- LogicalPlan::CreateMemoryTable { input, .. } => input.schema(),
- LogicalPlan::DropTable { schema, .. } => schema,
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
+ input.schema()
+ }
+ LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
}
}
@@ -342,13 +358,15 @@ impl LogicalPlan {
LogicalPlan::Explain(ExplainPlan { schema, .. })
| LogicalPlan::Analyze(AnalyzePlan { schema, .. })
| LogicalPlan::EmptyRelation { schema, .. }
- | LogicalPlan::CreateExternalTable { schema, .. } => vec![schema],
+ | LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
+ vec![schema]
+ }
LogicalPlan::Limit { input, .. }
| LogicalPlan::Repartition { input, .. }
| LogicalPlan::Sort { input, .. }
- | LogicalPlan::CreateMemoryTable { input, .. }
+ | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::Filter { input, .. } => input.all_schemas(),
- LogicalPlan::DropTable { .. } => vec![],
+ LogicalPlan::DropTable(_) => vec![],
}
}
@@ -393,9 +411,9 @@ impl LogicalPlan {
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
- | LogicalPlan::CreateExternalTable { .. }
- | LogicalPlan::CreateMemoryTable { .. }
- | LogicalPlan::DropTable { .. }
+ | LogicalPlan::CreateExternalTable(_)
+ | LogicalPlan::CreateMemoryTable(_)
+ | LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
@@ -422,13 +440,15 @@ impl LogicalPlan {
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
- LogicalPlan::CreateMemoryTable { input, .. } => vec![input],
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
+ vec![input]
+ }
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
- | LogicalPlan::CreateExternalTable { .. }
- | LogicalPlan::DropTable { .. } => vec![],
+ | LogicalPlan::CreateExternalTable(_)
+ | LogicalPlan::DropTable(_) => vec![],
}
}
@@ -561,7 +581,9 @@ impl LogicalPlan {
true
}
LogicalPlan::Limit { input, .. } => input.accept(visitor)?,
- LogicalPlan::CreateMemoryTable { input, .. } => input.accept(visitor)?,
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
+ input.accept(visitor)?
+ }
LogicalPlan::Extension(extension) => {
for input in extension.node.inputs() {
if !input.accept(visitor)? {
@@ -576,8 +598,8 @@ impl LogicalPlan {
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
- | LogicalPlan::CreateExternalTable { .. }
- | LogicalPlan::DropTable { .. } => true,
+ | LogicalPlan::CreateExternalTable(_)
+ | LogicalPlan::DropTable(_) => true,
};
if !recurse {
return Ok(false);
@@ -889,15 +911,18 @@ impl LogicalPlan {
}
},
LogicalPlan::Limit { ref n, .. } => write!(f, "Limit: {}", n),
- LogicalPlan::CreateExternalTable { ref name, .. } => {
+ LogicalPlan::CreateExternalTable(CreateExternalTable {
+ ref name,
+ ..
+ }) => {
write!(f, "CreateExternalTable: {:?}", name)
}
- LogicalPlan::CreateMemoryTable { ref name, .. } => {
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+ name, ..
+ }) => {
write!(f, "CreateMemoryTable: {:?}", name)
}
- LogicalPlan::DropTable {
- ref name, if_exist, ..
- } => {
+ LogicalPlan::DropTable(DropTable { name, if_exist, .. }) => {
write!(f, "DropTable: {:?} if not exist:={}", name, if_exist)
}
LogicalPlan::Explain { .. } => write!(f, "Explain"),
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index c88032e..9e7a9dd 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -202,11 +202,11 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::Values { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
- | LogicalPlan::CreateExternalTable { .. }
+ | LogicalPlan::CreateExternalTable(_)
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
- | LogicalPlan::CreateMemoryTable { .. }
- | LogicalPlan::DropTable { .. }
+ | LogicalPlan::CreateMemoryTable(_)
+ | LogicalPlan::DropTable(_)
| LogicalPlan::Extension { .. } => {
// apply the optimization to all inputs of the plan
let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs
index a0bc04a..06bac6c 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -69,9 +69,9 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Window { .. }
| LogicalPlan::Aggregate { .. }
| LogicalPlan::Repartition { .. }
- | LogicalPlan::CreateExternalTable { .. }
- | LogicalPlan::CreateMemoryTable { .. }
- | LogicalPlan::DropTable { .. }
+ | LogicalPlan::CreateExternalTable(_)
+ | LogicalPlan::CreateMemoryTable(_)
+ | LogicalPlan::DropTable(_)
| LogicalPlan::Values { .. }
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs
index a2bf6f8..1bb1a18 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -16,8 +16,7 @@
use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::TableScanPlan;
-use crate::logical_plan::{and, replace_col, Column, LogicalPlan};
+use crate::logical_plan::{and, replace_col, Column, LogicalPlan, TableScanPlan};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
diff --git a/datafusion/src/optimizer/limit_push_down.rs b/datafusion/src/optimizer/limit_push_down.rs
index ff8b8c1..842ca08 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -20,8 +20,8 @@
use super::utils;
use crate::error::Result;
use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::TableScanPlan;
use crate::logical_plan::LogicalPlan;
+use crate::logical_plan::TableScanPlan;
use crate::optimizer::optimizer::OptimizerRule;
use std::sync::Arc;
diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs
index e1abb5c..f3a7c61 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -433,9 +433,9 @@ fn optimize_plan(
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Sort { .. }
- | LogicalPlan::CreateExternalTable { .. }
- | LogicalPlan::CreateMemoryTable { .. }
- | LogicalPlan::DropTable { .. }
+ | LogicalPlan::CreateExternalTable(_)
+ | LogicalPlan::CreateMemoryTable(_)
+ | LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs
index 259c35a..fe14442 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -25,8 +25,9 @@ use super::optimizer::OptimizerRule;
use crate::execution::context::{ExecutionContextState, ExecutionProps};
use crate::logical_plan::plan::{AnalyzePlan, ExtensionPlan};
use crate::logical_plan::{
- build_join_schema, Column, DFSchema, DFSchemaRef, Expr, ExprRewriter, LogicalPlan,
- LogicalPlanBuilder, Operator, Partitioning, Recursion, RewriteRecursion,
+ build_join_schema, Column, CreateMemoryTable, DFSchema, DFSchemaRef, Expr,
+ ExprRewriter, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Recursion,
+ RewriteRecursion,
};
use crate::physical_plan::functions::Volatility;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
@@ -223,11 +224,11 @@ pub fn from_plan(
n: *n,
input: Arc::new(inputs[0].clone()),
}),
- LogicalPlan::CreateMemoryTable { name, .. } => {
- Ok(LogicalPlan::CreateMemoryTable {
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, .. }) => {
+ Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
input: Arc::new(inputs[0].clone()),
name: name.clone(),
- })
+ }))
}
LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(ExtensionPlan {
node: e.node.from_template(expr, inputs),
@@ -262,8 +263,8 @@ pub fn from_plan(
}
LogicalPlan::EmptyRelation { .. }
| LogicalPlan::TableScan { .. }
- | LogicalPlan::CreateExternalTable { .. }
- | LogicalPlan::DropTable { .. } => {
+ | LogicalPlan::CreateExternalTable(_)
+ | LogicalPlan::DropTable(_) => {
// All of these plan types have no inputs / exprs so should not be called
assert!(expr.is_empty(), "{:?} should have no exprs", plan);
assert!(inputs.is_empty(), "{:?} should have no inputs", plan);
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 5287e71..25ed0e4 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -23,7 +23,7 @@ use super::{
hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows,
};
use crate::execution::context::ExecutionContextState;
-use crate::logical_plan::plan::TableScanPlan;
+use crate::logical_plan::TableScanPlan;
use crate::logical_plan::{
unnormalize_cols, DFSchema, Expr, LogicalPlan, Operator,
Partitioning as LogicalPartitioning, PlanType, ToStringifiedPlan,
@@ -803,7 +803,7 @@ impl DefaultPhysicalPlanner {
Ok(Arc::new(GlobalLimitExec::new(input, limit)))
}
- LogicalPlan::CreateExternalTable { .. } => {
+ LogicalPlan::CreateExternalTable(_) => {
// There is no default plan for "CREATE EXTERNAL
// TABLE" -- it must be handled at a higher level (so
// that the appropriate table can be registered with
@@ -812,7 +812,7 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
- | LogicalPlan::CreateMemoryTable {..} | LogicalPlan::DropTable {..} => {
+ | LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable (_) => {
// Create a dummy exec.
Ok(Arc::new(EmptyExec::new(
false,
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 87c3749..575acd0 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -29,9 +29,11 @@ use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
use crate::logical_plan::Expr::Alias;
use crate::logical_plan::{
and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column,
- DFSchema, DFSchemaRef, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType,
+ CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, DFSchema,
+ DFSchemaRef, DropTable, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType,
ToDFSchema, ToStringifiedPlan,
};
+
use crate::optimizer::utils::exprlist_to_columns;
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
@@ -158,10 +160,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
{
let plan = self.query_to_plan(query)?;
- Ok(LogicalPlan::CreateMemoryTable {
+ Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name: name.to_string(),
input: Arc::new(plan),
- })
+ }))
}
Statement::CreateTable { .. } => Err(DataFusionError::NotImplemented(
"Only `CREATE TABLE table_name AS SELECT ...` statement is supported"
@@ -177,11 +179,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} =>
// We don't support cascade and purge for now.
{
- Ok(LogicalPlan::DropTable {
+ Ok(LogicalPlan::DropTable(DropTable {
name: names.get(0).unwrap().to_string(),
if_exist: *if_exists,
schema: DFSchemaRef::new(DFSchema::empty()),
- })
+ }))
}
Statement::ShowColumns {
@@ -306,13 +308,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let schema = self.build_schema(columns)?;
- Ok(LogicalPlan::CreateExternalTable {
+ Ok(LogicalPlan::CreateExternalTable(PlanCreateExternalTable {
schema: schema.to_dfschema_ref()?,
name: name.clone(),
location: location.clone(),
file_type: *file_type,
has_header: *has_header,
- })
+ }))
}
/// Generate a plan for EXPLAIN ... that will print out a plan
diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs
index de44b82..25040a6 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -31,7 +31,7 @@ use datafusion::{
use datafusion::execution::context::ExecutionContext;
use datafusion::logical_plan::{
- col, plan::TableScanPlan, Expr, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
+ col, Expr, LogicalPlan, LogicalPlanBuilder, TableScanPlan, UNNAMED_TABLE,
};
use datafusion::physical_plan::{
ColumnStatistics, ExecutionPlan, Partitioning, RecordBatchStream,
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index b06b170..63d4d69 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -36,8 +36,8 @@ use datafusion::assert_batches_eq;
use datafusion::assert_batches_sorted_eq;
use datafusion::assert_contains;
use datafusion::assert_not_contains;
-use datafusion::logical_plan::plan::TableScanPlan;
use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_plan::TableScanPlan;
use datafusion::physical_plan::functions::Volatility;
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::physical_plan::ExecutionPlan;