You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/04 14:15:18 UTC
[arrow-datafusion] branch main updated: Move `TransactionStart`/`TransactionEnd`/`SetVariable` into `LogicalPlan::Statement` (#5842)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6dfbe48328 Move `TransactionStart`/`TransactionEnd`/`SetVariable` into `LogicalPlan::Statement` (#5842)
6dfbe48328 is described below
commit 6dfbe483289ef0b10c82ffa98e3be8b5201acb78
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Apr 4 16:15:11 2023 +0200
Move `TransactionStart`/`TransactionEnd`/`SetVariable` into `LogicalPlan::Statement` (#5842)
---
datafusion/core/src/execution/context.rs | 12 +-
datafusion/core/src/physical_plan/planner.rs | 16 +-
datafusion/expr/src/lib.rs | 12 +-
datafusion/expr/src/logical_plan/mod.rs | 16 +-
datafusion/expr/src/logical_plan/plan.rs | 141 ++--------------
datafusion/expr/src/logical_plan/statement.rs | 188 +++++++++++++++++++++
datafusion/expr/src/utils.rs | 4 +-
.../optimizer/src/common_subexpr_eliminate.rs | 4 +-
datafusion/proto/src/logical_plan/mod.rs | 10 +-
datafusion/sql/src/statement.rs | 33 ++--
10 files changed, 251 insertions(+), 185 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 9ff7c66004..eb9f6ba3c4 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -31,7 +31,9 @@ use crate::{
optimizer::PhysicalOptimizerRule,
},
};
-use datafusion_expr::{DescribeTable, DmlStatement, StringifiedPlan, WriteOp};
+use datafusion_expr::{
+ logical_plan::Statement, DescribeTable, DmlStatement, StringifiedPlan, WriteOp,
+};
pub use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::var_provider::is_system_variables;
use parking_lot::RwLock;
@@ -444,9 +446,11 @@ impl SessionContext {
}
}
- LogicalPlan::SetVariable(SetVariable {
- variable, value, ..
- }) => {
+ LogicalPlan::Statement(Statement::SetVariable(SetVariable {
+ variable,
+ value,
+ ..
+ })) => {
let mut state = self.state.write();
state.config.options_mut().set(&variable, &value)?;
drop(state);
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index f9a9209454..25afdb4d1c 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1156,21 +1156,11 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: Dml".to_string(),
))
}
- LogicalPlan::TransactionStart(_) => {
+ LogicalPlan::Statement(statement) => {
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
+ let name = statement.name();
Err(DataFusionError::NotImplemented(
- "Unsupported logical plan: TransactionStart".to_string(),
- ))
- }
- LogicalPlan::TransactionEnd(_) => {
- // DataFusion is a read-only query engine, but also a library, so consumers may implement this
- Err(DataFusionError::NotImplemented(
- "Unsupported logical plan: TransactionEnd".to_string(),
- ))
- }
- LogicalPlan::SetVariable(_) => {
- Err(DataFusionError::Internal(
- "Unsupported logical plan: SetVariable must be root of the plan".to_string(),
+ format!("Unsupported logical plan: Statement({name})")
))
}
LogicalPlan::DescribeTable(_) => {
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index 8291f6f34b..0d5963021e 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -70,13 +70,15 @@ pub use logical_plan::{
builder::{
build_join_schema, union, wrap_projection_for_join_if_necessary, UNNAMED_TABLE,
},
- Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
+ Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, DescribeTable, Distinct, DmlStatement,
DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint,
- JoinType, Limit, LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, Projection,
- Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
- ToStringifiedPlan, Union, Unnest, UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
- Values, Window, WriteOp,
+ JoinType, Limit, LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, Prepare,
+ Projection, Repartition, SetVariable, Sort, Statement, StringifiedPlan, Subquery,
+ SubqueryAlias, TableScan, ToStringifiedPlan, TransactionAccessMode,
+ TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
+ Union, Unnest, UserDefinedLogicalNode, UserDefinedLogicalNodeCore, Values, Window,
+ WriteOp,
};
pub use nullif::SUPPORTED_NULLIF_TYPES;
pub use operator::Operator;
diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs
index 764d02547b..d2b0410097 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -19,16 +19,20 @@ pub mod builder;
pub mod display;
mod extension;
mod plan;
+mod statement;
pub use builder::{table_scan, LogicalPlanBuilder};
pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
- CreateMemoryTable, CreateView, CrossJoin, DescribeTable, Distinct, DmlStatement,
- DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint,
- JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection,
- Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
- ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd,
- TransactionIsolationLevel, TransactionStart, Union, Unnest, Values, Window, WriteOp,
+ CreateMemoryTable, CreateView, CrossJoin, DescribeTable, Distinct, DropTable,
+ DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType,
+ Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection, Repartition, Sort,
+ StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
+ Unnest, Values, Window,
+};
+pub use statement::{
+ DmlStatement, SetVariable, Statement, TransactionAccessMode, TransactionConclusion,
+ TransactionEnd, TransactionIsolationLevel, TransactionStart, WriteOp,
};
pub use display::display_schema;
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index b4fc9edd6b..e527a0a70c 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -19,6 +19,8 @@
use crate::logical_plan::builder::validate_unique_names;
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
+use crate::logical_plan::statement::{DmlStatement, Statement};
+
use crate::logical_plan::plan;
use crate::utils::{
enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs, from_plan,
@@ -88,6 +90,8 @@ pub enum LogicalPlan {
SubqueryAlias(SubqueryAlias),
/// Skip some number of rows, and then fetch some number of rows.
Limit(Limit),
+ /// [`Statement`]
+ Statement(Statement),
/// Creates an external table.
CreateExternalTable(CreateExternalTable),
/// Creates an in memory table.
@@ -116,8 +120,6 @@ pub enum LogicalPlan {
Extension(Extension),
/// Remove duplicate rows from the input
Distinct(Distinct),
- /// Set a Variable
- SetVariable(SetVariable),
/// Prepare a statement
Prepare(Prepare),
/// Insert / Update / Delete
@@ -126,10 +128,6 @@ pub enum LogicalPlan {
DescribeTable(DescribeTable),
/// Unnest a column that contains a nested list type.
Unnest(Unnest),
- // Begin a transaction
- TransactionStart(TransactionStart),
- // Commit or rollback a transaction
- TransactionEnd(TransactionEnd),
}
impl LogicalPlan {
@@ -151,6 +149,7 @@ impl LogicalPlan {
LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
+ LogicalPlan::Statement(statement) => statement.schema(),
LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
@@ -169,14 +168,11 @@ impl LogicalPlan {
LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => schema,
LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
LogicalPlan::DropView(DropView { schema, .. }) => schema,
- LogicalPlan::SetVariable(SetVariable { schema, .. }) => schema,
LogicalPlan::DescribeTable(DescribeTable { dummy_schema, .. }) => {
dummy_schema
}
LogicalPlan::Dml(DmlStatement { table_schema, .. }) => table_schema,
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
- LogicalPlan::TransactionStart(TransactionStart { schema, .. }) => schema,
- LogicalPlan::TransactionEnd(TransactionEnd { schema, .. }) => schema,
}
}
@@ -243,12 +239,10 @@ impl LogicalPlan {
self.inputs().iter().map(|p| p.schema()).collect()
}
// return empty
- LogicalPlan::DropTable(_)
+ LogicalPlan::Statement(_)
+ | LogicalPlan::DropTable(_)
| LogicalPlan::DropView(_)
- | LogicalPlan::DescribeTable(_)
- | LogicalPlan::TransactionStart(_)
- | LogicalPlan::TransactionEnd(_)
- | LogicalPlan::SetVariable(_) => vec![],
+ | LogicalPlan::DescribeTable(_) => vec![],
}
}
@@ -362,15 +356,13 @@ impl LogicalPlan {
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
+ | LogicalPlan::Statement(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateView(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
- | LogicalPlan::TransactionStart(_)
- | LogicalPlan::TransactionEnd(_)
- | LogicalPlan::SetVariable(_)
| LogicalPlan::DropView(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze(_)
@@ -414,15 +406,13 @@ impl LogicalPlan {
LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
// plans without inputs
LogicalPlan::TableScan { .. }
+ | LogicalPlan::Statement { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
- | LogicalPlan::TransactionStart(_)
- | LogicalPlan::TransactionEnd(_)
- | LogicalPlan::SetVariable(_)
| LogicalPlan::DropView(_)
| LogicalPlan::DescribeTable(_) => vec![],
}
@@ -1039,6 +1029,9 @@ impl LogicalPlan {
LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
write!(f, "SubqueryAlias: {alias}")
}
+ LogicalPlan::Statement(statement) => {
+ write!(f, "{}", statement.display())
+ }
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref name,
..
@@ -1077,30 +1070,11 @@ impl LogicalPlan {
}) => {
write!(f, "DropTable: {name:?} if not exist:={if_exists}")
}
- LogicalPlan::TransactionStart(TransactionStart {
- access_mode,
- isolation_level,
- ..
- }) => {
- write!(f, "TransactionStart: {access_mode:?} {isolation_level:?}")
- }
- LogicalPlan::TransactionEnd(TransactionEnd {
- conclusion,
- chain,
- ..
- }) => {
- write!(f, "TransactionEnd: {conclusion:?} chain:={chain}")
- }
LogicalPlan::DropView(DropView {
name, if_exists, ..
}) => {
write!(f, "DropView: {name:?} if not exist:={if_exists}")
}
- LogicalPlan::SetVariable(SetVariable {
- variable, value, ..
- }) => {
- write!(f, "SetVariable: set {variable:?} to {value:?}")
- }
LogicalPlan::Distinct(Distinct { .. }) => {
write!(f, "Distinct:")
}
@@ -1234,18 +1208,6 @@ pub struct DropView {
pub schema: DFSchemaRef,
}
-/// Set a Variable's value -- value in
-/// [`ConfigOptions`](datafusion_common::config::ConfigOptions)
-#[derive(Clone, PartialEq, Eq, Hash)]
-pub struct SetVariable {
- /// The variable name
- pub variable: String,
- /// The value to set
- pub value: String,
- /// Dummy schema
- pub schema: DFSchemaRef,
-}
-
/// Produces no rows: An empty relation with an empty schema
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct EmptyRelation {
@@ -1569,83 +1531,6 @@ impl Hash for CreateExternalTable {
}
}
-#[derive(Clone, PartialEq, Eq, Hash)]
-pub enum WriteOp {
- Insert,
- Delete,
- Update,
- Ctas,
-}
-
-impl Display for WriteOp {
- fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
- match self {
- WriteOp::Insert => write!(f, "Insert"),
- WriteOp::Delete => write!(f, "Delete"),
- WriteOp::Update => write!(f, "Update"),
- WriteOp::Ctas => write!(f, "Ctas"),
- }
- }
-}
-
-/// The operator that modifies the content of a database (adapted from substrait WriteRel)
-#[derive(Clone, PartialEq, Eq, Hash)]
-pub struct DmlStatement {
- /// The table name
- pub table_name: OwnedTableReference,
- /// The schema of the table (must align with Rel input)
- pub table_schema: DFSchemaRef,
- /// The type of operation to perform
- pub op: WriteOp,
- /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
- pub input: Arc<LogicalPlan>,
-}
-
-/// Indicates if a transaction was committed or aborted
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
-pub enum TransactionConclusion {
- Commit,
- Rollback,
-}
-
-/// Indicates if this transaction is allowed to write
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
-pub enum TransactionAccessMode {
- ReadOnly,
- ReadWrite,
-}
-
-/// Indicates ANSI transaction isolation level
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
-pub enum TransactionIsolationLevel {
- ReadUncommitted,
- ReadCommitted,
- RepeatableRead,
- Serializable,
-}
-
-/// Indicator that the following statements should be committed or rolled back atomically
-#[derive(Clone, PartialEq, Eq, Hash)]
-pub struct TransactionStart {
- /// indicates if transaction is allowed to write
- pub access_mode: TransactionAccessMode,
- // indicates ANSI isolation level
- pub isolation_level: TransactionIsolationLevel,
- /// Empty schema
- pub schema: DFSchemaRef,
-}
-
-/// Indicator that any current transaction should be terminated
-#[derive(Clone, PartialEq, Eq, Hash)]
-pub struct TransactionEnd {
- /// whether the transaction committed or aborted
- pub conclusion: TransactionConclusion,
- /// if specified a new transaction is immediately started with same characteristics
- pub chain: bool,
- /// Empty schema
- pub schema: DFSchemaRef,
-}
-
/// Prepare a statement but do not execute it. Prepare statements can have 0 or more
/// `Expr::Placeholder` expressions that are filled in during execution
#[derive(Clone, PartialEq, Eq, Hash)]
diff --git a/datafusion/expr/src/logical_plan/statement.rs b/datafusion/expr/src/logical_plan/statement.rs
new file mode 100644
index 0000000000..dad09d996b
--- /dev/null
+++ b/datafusion/expr/src/logical_plan/statement.rs
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ fmt::{self, Display},
+ sync::Arc,
+};
+
+use datafusion_common::{DFSchemaRef, OwnedTableReference};
+
+use crate::LogicalPlan;
+
+/// Various types of Statements.
+///
+/// # Transactions:
+///
+/// While DataFusion does not offer support transactions, it provides
+/// [`LogicalPlan`](crate::LogicalPlan) support to assist building
+/// database systems using DataFusion
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub enum Statement {
+ // Begin a transaction
+ TransactionStart(TransactionStart),
+ // Commit or rollback a transaction
+ TransactionEnd(TransactionEnd),
+ /// Set a Variable
+ SetVariable(SetVariable),
+}
+
+impl Statement {
+ /// Get a reference to the logical plan's schema
+ pub fn schema(&self) -> &DFSchemaRef {
+ match self {
+ Statement::TransactionStart(TransactionStart { schema, .. }) => schema,
+ Statement::TransactionEnd(TransactionEnd { schema, .. }) => schema,
+ Statement::SetVariable(SetVariable { schema, .. }) => schema,
+ }
+ }
+
+ /// Return a descriptive string describing the type of this
+ /// [`Statement`]
+ pub fn name(&self) -> &str {
+ match self {
+ Statement::TransactionStart(_) => "TransactionStart",
+ Statement::TransactionEnd(_) => "TransactionEnd",
+ Statement::SetVariable(_) => "SetVariable",
+ }
+ }
+
+ /// Return a `format`able structure with the a human readable
+ /// description of this LogicalPlan node per node, not including
+ /// children.
+ ///
+ /// See [LogicalPlan::display] for an example
+ pub fn display(&self) -> impl fmt::Display + '_ {
+ struct Wrapper<'a>(&'a Statement);
+ impl<'a> Display for Wrapper<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self.0 {
+ Statement::TransactionStart(TransactionStart {
+ access_mode,
+ isolation_level,
+ ..
+ }) => {
+ write!(f, "TransactionStart: {access_mode:?} {isolation_level:?}")
+ }
+ Statement::TransactionEnd(TransactionEnd {
+ conclusion,
+ chain,
+ ..
+ }) => {
+ write!(f, "TransactionEnd: {conclusion:?} chain:={chain}")
+ }
+ Statement::SetVariable(SetVariable {
+ variable, value, ..
+ }) => {
+ write!(f, "SetVariable: set {variable:?} to {value:?}")
+ }
+ }
+ }
+ }
+ Wrapper(self)
+ }
+}
+
+/// The operator that modifies the content of a database (adapted from
+/// substrait WriteRel)
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct DmlStatement {
+ /// The table name
+ pub table_name: OwnedTableReference,
+ /// The schema of the table (must align with Rel input)
+ pub table_schema: DFSchemaRef,
+ /// The type of operation to perform
+ pub op: WriteOp,
+ /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
+ pub input: Arc<LogicalPlan>,
+}
+
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub enum WriteOp {
+ Insert,
+ Delete,
+ Update,
+ Ctas,
+}
+
+impl Display for WriteOp {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ WriteOp::Insert => write!(f, "Insert"),
+ WriteOp::Delete => write!(f, "Delete"),
+ WriteOp::Update => write!(f, "Update"),
+ WriteOp::Ctas => write!(f, "Ctas"),
+ }
+ }
+}
+
+/// Indicates if a transaction was committed or aborted
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub enum TransactionConclusion {
+ Commit,
+ Rollback,
+}
+
+/// Indicates if this transaction is allowed to write
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub enum TransactionAccessMode {
+ ReadOnly,
+ ReadWrite,
+}
+
+/// Indicates ANSI transaction isolation level
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub enum TransactionIsolationLevel {
+ ReadUncommitted,
+ ReadCommitted,
+ RepeatableRead,
+ Serializable,
+}
+
+/// Indicator that the following statements should be committed or rolled back atomically
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct TransactionStart {
+ /// indicates if transaction is allowed to write
+ pub access_mode: TransactionAccessMode,
+ // indicates ANSI isolation level
+ pub isolation_level: TransactionIsolationLevel,
+ /// Empty schema
+ pub schema: DFSchemaRef,
+}
+
+/// Indicator that any current transaction should be terminated
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct TransactionEnd {
+ /// whether the transaction committed or aborted
+ pub conclusion: TransactionConclusion,
+ /// if specified a new transaction is immediately started with same characteristics
+ pub chain: bool,
+ /// Empty schema
+ pub schema: DFSchemaRef,
+}
+
+/// Set a Variable's value -- value in
+/// [`ConfigOptions`](datafusion_common::config::ConfigOptions)
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct SetVariable {
+ /// The variable name
+ pub variable: String,
+ /// The value to set
+ pub value: String,
+ /// Dummy schema
+ pub schema: DFSchemaRef,
+}
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 726365f574..f3975e8346 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -914,9 +914,7 @@ pub fn from_plan(
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::DropView(_)
- | LogicalPlan::TransactionStart(_)
- | LogicalPlan::TransactionEnd(_)
- | LogicalPlan::SetVariable(_)
+ | LogicalPlan::Statement(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_) => {
// All of these plan types have no inputs / exprs so should not be called
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index d3fba585fe..719b17110b 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -237,9 +237,7 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::DropView(_)
- | LogicalPlan::TransactionStart(_)
- | LogicalPlan::TransactionEnd(_)
- | LogicalPlan::SetVariable(_)
+ | LogicalPlan::Statement(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Extension(_)
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index 936ce4984c..ff86fc8634 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -1360,18 +1360,12 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::DropView(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DropView",
)),
- LogicalPlan::SetVariable(_) => Err(proto_error(
- "LogicalPlan serde is not yet implemented for SetVariable",
+ LogicalPlan::Statement(_) => Err(proto_error(
+ "LogicalPlan serde is not yet implemented for Statement",
)),
LogicalPlan::Dml(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Dml",
)),
- LogicalPlan::TransactionStart(_) => Err(proto_error(
- "LogicalPlan serde is not yet implemented for Transactions",
- )),
- LogicalPlan::TransactionEnd(_) => Err(proto_error(
- "LogicalPlan serde is not yet implemented for Transactions",
- )),
LogicalPlan::DescribeTable(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DescribeTable",
)),
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index b396976186..ef1ecfef58 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -30,17 +30,15 @@ use datafusion_common::{
};
use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
use datafusion_expr::logical_plan::builder::project;
-use datafusion_expr::logical_plan::{
- Analyze, Prepare, TransactionAccessMode, TransactionConclusion, TransactionEnd,
- TransactionIsolationLevel, TransactionStart,
-};
use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{
- cast, col, CreateCatalog, CreateCatalogSchema,
+ cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView,
DescribeTable, DmlStatement, DropTable, DropView, EmptyRelation, Explain,
- ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, PlanType, SetVariable,
- ToStringifiedPlan, WriteOp,
+ ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, PlanType, Prepare,
+ SetVariable, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
+ TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
+ WriteOp,
};
use sqlparser::ast;
use sqlparser::ast::{
@@ -435,25 +433,28 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
TransactionAccessMode::ReadWrite
}
};
- Ok(LogicalPlan::TransactionStart(TransactionStart {
+ let statement = PlanStatement::TransactionStart(TransactionStart {
access_mode,
isolation_level,
schema: DFSchemaRef::new(DFSchema::empty()),
- }))
+ });
+ Ok(LogicalPlan::Statement(statement))
}
Statement::Commit { chain } => {
- Ok(LogicalPlan::TransactionEnd(TransactionEnd {
+ let statement = PlanStatement::TransactionEnd(TransactionEnd {
conclusion: TransactionConclusion::Commit,
chain,
schema: DFSchemaRef::new(DFSchema::empty()),
- }))
+ });
+ Ok(LogicalPlan::Statement(statement))
}
Statement::Rollback { chain } => {
- Ok(LogicalPlan::TransactionEnd(TransactionEnd {
+ let statement = PlanStatement::TransactionEnd(TransactionEnd {
conclusion: TransactionConclusion::Rollback,
chain,
schema: DFSchemaRef::new(DFSchema::empty()),
- }))
+ });
+ Ok(LogicalPlan::Statement(statement))
}
_ => Err(DataFusionError::NotImplemented(format!(
@@ -736,11 +737,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
};
- Ok(LogicalPlan::SetVariable(SetVariable {
+ let statement = PlanStatement::SetVariable(SetVariable {
variable: variable_lower,
value: value_string,
schema: DFSchemaRef::new(DFSchema::empty()),
- }))
+ });
+
+ Ok(LogicalPlan::Statement(statement))
}
fn delete_to_plan(