You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/04 14:15:18 UTC

[arrow-datafusion] branch main updated: Move `TransactionStart`/`TransactionEnd`/`SetVariable` into `LogicalPlan::Statement` (#5842)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6dfbe48328 Move `TransactionStart`/`TransactionEnd`/`SetVariable` into `LogicalPlan::Statement` (#5842)
6dfbe48328 is described below

commit 6dfbe483289ef0b10c82ffa98e3be8b5201acb78
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Apr 4 16:15:11 2023 +0200

    Move `TransactionStart`/`TransactionEnd`/`SetVariable` into `LogicalPlan::Statement` (#5842)
---
 datafusion/core/src/execution/context.rs           |  12 +-
 datafusion/core/src/physical_plan/planner.rs       |  16 +-
 datafusion/expr/src/lib.rs                         |  12 +-
 datafusion/expr/src/logical_plan/mod.rs            |  16 +-
 datafusion/expr/src/logical_plan/plan.rs           | 141 ++--------------
 datafusion/expr/src/logical_plan/statement.rs      | 188 +++++++++++++++++++++
 datafusion/expr/src/utils.rs                       |   4 +-
 .../optimizer/src/common_subexpr_eliminate.rs      |   4 +-
 datafusion/proto/src/logical_plan/mod.rs           |  10 +-
 datafusion/sql/src/statement.rs                    |  33 ++--
 10 files changed, 251 insertions(+), 185 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 9ff7c66004..eb9f6ba3c4 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -31,7 +31,9 @@ use crate::{
         optimizer::PhysicalOptimizerRule,
     },
 };
-use datafusion_expr::{DescribeTable, DmlStatement, StringifiedPlan, WriteOp};
+use datafusion_expr::{
+    logical_plan::Statement, DescribeTable, DmlStatement, StringifiedPlan, WriteOp,
+};
 pub use datafusion_physical_expr::execution_props::ExecutionProps;
 use datafusion_physical_expr::var_provider::is_system_variables;
 use parking_lot::RwLock;
@@ -444,9 +446,11 @@ impl SessionContext {
                 }
             }
 
-            LogicalPlan::SetVariable(SetVariable {
-                variable, value, ..
-            }) => {
+            LogicalPlan::Statement(Statement::SetVariable(SetVariable {
+                variable,
+                value,
+                ..
+            })) => {
                 let mut state = self.state.write();
                 state.config.options_mut().set(&variable, &value)?;
                 drop(state);
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index f9a9209454..25afdb4d1c 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1156,21 +1156,11 @@ impl DefaultPhysicalPlanner {
                         "Unsupported logical plan: Dml".to_string(),
                     ))
                 }
-                LogicalPlan::TransactionStart(_) => {
+                LogicalPlan::Statement(statement) => {
                     // DataFusion is a read-only query engine, but also a library, so consumers may implement this
+                    let name = statement.name();
                     Err(DataFusionError::NotImplemented(
-                        "Unsupported logical plan: TransactionStart".to_string(),
-                    ))
-                }
-                LogicalPlan::TransactionEnd(_) => {
-                    // DataFusion is a read-only query engine, but also a library, so consumers may implement this
-                    Err(DataFusionError::NotImplemented(
-                        "Unsupported logical plan: TransactionEnd".to_string(),
-                    ))
-                }
-                LogicalPlan::SetVariable(_) => {
-                    Err(DataFusionError::Internal(
-                        "Unsupported logical plan: SetVariable must be root of the plan".to_string(),
+                        format!("Unsupported logical plan: Statement({name})")
                     ))
                 }
                 LogicalPlan::DescribeTable(_) => {
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index 8291f6f34b..0d5963021e 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -70,13 +70,15 @@ pub use logical_plan::{
     builder::{
         build_join_schema, union, wrap_projection_for_join_if_necessary, UNNAMED_TABLE,
     },
-    Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
+    Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
     CreateMemoryTable, CreateView, CrossJoin, DescribeTable, Distinct, DmlStatement,
     DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint,
-    JoinType, Limit, LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, Projection,
-    Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
-    ToStringifiedPlan, Union, Unnest, UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
-    Values, Window, WriteOp,
+    JoinType, Limit, LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, Prepare,
+    Projection, Repartition, SetVariable, Sort, Statement, StringifiedPlan, Subquery,
+    SubqueryAlias, TableScan, ToStringifiedPlan, TransactionAccessMode,
+    TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
+    Union, Unnest, UserDefinedLogicalNode, UserDefinedLogicalNodeCore, Values, Window,
+    WriteOp,
 };
 pub use nullif::SUPPORTED_NULLIF_TYPES;
 pub use operator::Operator;
diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs
index 764d02547b..d2b0410097 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -19,16 +19,20 @@ pub mod builder;
 pub mod display;
 mod extension;
 mod plan;
+mod statement;
 
 pub use builder::{table_scan, LogicalPlanBuilder};
 pub use plan::{
     Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
-    CreateMemoryTable, CreateView, CrossJoin, DescribeTable, Distinct, DmlStatement,
-    DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint,
-    JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection,
-    Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
-    ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd,
-    TransactionIsolationLevel, TransactionStart, Union, Unnest, Values, Window, WriteOp,
+    CreateMemoryTable, CreateView, CrossJoin, DescribeTable, Distinct, DropTable,
+    DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType,
+    Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection, Repartition, Sort,
+    StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
+    Unnest, Values, Window,
+};
+pub use statement::{
+    DmlStatement, SetVariable, Statement, TransactionAccessMode, TransactionConclusion,
+    TransactionEnd, TransactionIsolationLevel, TransactionStart, WriteOp,
 };
 
 pub use display::display_schema;
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index b4fc9edd6b..e527a0a70c 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -19,6 +19,8 @@
 use crate::logical_plan::builder::validate_unique_names;
 use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
 use crate::logical_plan::extension::UserDefinedLogicalNode;
+use crate::logical_plan::statement::{DmlStatement, Statement};
+
 use crate::logical_plan::plan;
 use crate::utils::{
     enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs, from_plan,
@@ -88,6 +90,8 @@ pub enum LogicalPlan {
     SubqueryAlias(SubqueryAlias),
     /// Skip some number of rows, and then fetch some number of rows.
     Limit(Limit),
+    /// [`Statement`]
+    Statement(Statement),
     /// Creates an external table.
     CreateExternalTable(CreateExternalTable),
     /// Creates an in memory table.
@@ -116,8 +120,6 @@ pub enum LogicalPlan {
     Extension(Extension),
     /// Remove duplicate rows from the input
     Distinct(Distinct),
-    /// Set a Variable
-    SetVariable(SetVariable),
     /// Prepare a statement
     Prepare(Prepare),
     /// Insert / Update / Delete
@@ -126,10 +128,6 @@ pub enum LogicalPlan {
     DescribeTable(DescribeTable),
     /// Unnest a column that contains a nested list type.
     Unnest(Unnest),
-    // Begin a transaction
-    TransactionStart(TransactionStart),
-    // Commit or rollback a transaction
-    TransactionEnd(TransactionEnd),
 }
 
 impl LogicalPlan {
@@ -151,6 +149,7 @@ impl LogicalPlan {
             LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
             LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
             LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
+            LogicalPlan::Statement(statement) => statement.schema(),
             LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
             LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
             LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
@@ -169,14 +168,11 @@ impl LogicalPlan {
             LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => schema,
             LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
             LogicalPlan::DropView(DropView { schema, .. }) => schema,
-            LogicalPlan::SetVariable(SetVariable { schema, .. }) => schema,
             LogicalPlan::DescribeTable(DescribeTable { dummy_schema, .. }) => {
                 dummy_schema
             }
             LogicalPlan::Dml(DmlStatement { table_schema, .. }) => table_schema,
             LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
-            LogicalPlan::TransactionStart(TransactionStart { schema, .. }) => schema,
-            LogicalPlan::TransactionEnd(TransactionEnd { schema, .. }) => schema,
         }
     }
 
@@ -243,12 +239,10 @@ impl LogicalPlan {
                 self.inputs().iter().map(|p| p.schema()).collect()
             }
             // return empty
-            LogicalPlan::DropTable(_)
+            LogicalPlan::Statement(_)
+            | LogicalPlan::DropTable(_)
             | LogicalPlan::DropView(_)
-            | LogicalPlan::DescribeTable(_)
-            | LogicalPlan::TransactionStart(_)
-            | LogicalPlan::TransactionEnd(_)
-            | LogicalPlan::SetVariable(_) => vec![],
+            | LogicalPlan::DescribeTable(_) => vec![],
         }
     }
 
@@ -362,15 +356,13 @@ impl LogicalPlan {
             | LogicalPlan::Subquery(_)
             | LogicalPlan::SubqueryAlias(_)
             | LogicalPlan::Limit(_)
+            | LogicalPlan::Statement(_)
             | LogicalPlan::CreateExternalTable(_)
             | LogicalPlan::CreateMemoryTable(_)
             | LogicalPlan::CreateView(_)
             | LogicalPlan::CreateCatalogSchema(_)
             | LogicalPlan::CreateCatalog(_)
             | LogicalPlan::DropTable(_)
-            | LogicalPlan::TransactionStart(_)
-            | LogicalPlan::TransactionEnd(_)
-            | LogicalPlan::SetVariable(_)
             | LogicalPlan::DropView(_)
             | LogicalPlan::CrossJoin(_)
             | LogicalPlan::Analyze(_)
@@ -414,15 +406,13 @@ impl LogicalPlan {
             LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
             // plans without inputs
             LogicalPlan::TableScan { .. }
+            | LogicalPlan::Statement { .. }
             | LogicalPlan::EmptyRelation { .. }
             | LogicalPlan::Values { .. }
             | LogicalPlan::CreateExternalTable(_)
             | LogicalPlan::CreateCatalogSchema(_)
             | LogicalPlan::CreateCatalog(_)
             | LogicalPlan::DropTable(_)
-            | LogicalPlan::TransactionStart(_)
-            | LogicalPlan::TransactionEnd(_)
-            | LogicalPlan::SetVariable(_)
             | LogicalPlan::DropView(_)
             | LogicalPlan::DescribeTable(_) => vec![],
         }
@@ -1039,6 +1029,9 @@ impl LogicalPlan {
                     LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
                         write!(f, "SubqueryAlias: {alias}")
                     }
+                    LogicalPlan::Statement(statement) => {
+                        write!(f, "{}", statement.display())
+                    }
                     LogicalPlan::CreateExternalTable(CreateExternalTable {
                         ref name,
                         ..
@@ -1077,30 +1070,11 @@ impl LogicalPlan {
                     }) => {
                         write!(f, "DropTable: {name:?} if not exist:={if_exists}")
                     }
-                    LogicalPlan::TransactionStart(TransactionStart {
-                        access_mode,
-                        isolation_level,
-                        ..
-                    }) => {
-                        write!(f, "TransactionStart: {access_mode:?} {isolation_level:?}")
-                    }
-                    LogicalPlan::TransactionEnd(TransactionEnd {
-                        conclusion,
-                        chain,
-                        ..
-                    }) => {
-                        write!(f, "TransactionEnd: {conclusion:?} chain:={chain}")
-                    }
                     LogicalPlan::DropView(DropView {
                         name, if_exists, ..
                     }) => {
                         write!(f, "DropView: {name:?} if not exist:={if_exists}")
                     }
-                    LogicalPlan::SetVariable(SetVariable {
-                        variable, value, ..
-                    }) => {
-                        write!(f, "SetVariable: set {variable:?} to {value:?}")
-                    }
                     LogicalPlan::Distinct(Distinct { .. }) => {
                         write!(f, "Distinct:")
                     }
@@ -1234,18 +1208,6 @@ pub struct DropView {
     pub schema: DFSchemaRef,
 }
 
-/// Set a Variable's value -- value in
-/// [`ConfigOptions`](datafusion_common::config::ConfigOptions)
-#[derive(Clone, PartialEq, Eq, Hash)]
-pub struct SetVariable {
-    /// The variable name
-    pub variable: String,
-    /// The value to set
-    pub value: String,
-    /// Dummy schema
-    pub schema: DFSchemaRef,
-}
-
 /// Produces no rows: An empty relation with an empty schema
 #[derive(Clone, PartialEq, Eq, Hash)]
 pub struct EmptyRelation {
@@ -1569,83 +1531,6 @@ impl Hash for CreateExternalTable {
     }
 }
 
-#[derive(Clone, PartialEq, Eq, Hash)]
-pub enum WriteOp {
-    Insert,
-    Delete,
-    Update,
-    Ctas,
-}
-
-impl Display for WriteOp {
-    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
-        match self {
-            WriteOp::Insert => write!(f, "Insert"),
-            WriteOp::Delete => write!(f, "Delete"),
-            WriteOp::Update => write!(f, "Update"),
-            WriteOp::Ctas => write!(f, "Ctas"),
-        }
-    }
-}
-
-/// The operator that modifies the content of a database (adapted from substrait WriteRel)
-#[derive(Clone, PartialEq, Eq, Hash)]
-pub struct DmlStatement {
-    /// The table name
-    pub table_name: OwnedTableReference,
-    /// The schema of the table (must align with Rel input)
-    pub table_schema: DFSchemaRef,
-    /// The type of operation to perform
-    pub op: WriteOp,
-    /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
-    pub input: Arc<LogicalPlan>,
-}
-
-/// Indicates if a transaction was committed or aborted
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
-pub enum TransactionConclusion {
-    Commit,
-    Rollback,
-}
-
-/// Indicates if this transaction is allowed to write
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
-pub enum TransactionAccessMode {
-    ReadOnly,
-    ReadWrite,
-}
-
-/// Indicates ANSI transaction isolation level
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
-pub enum TransactionIsolationLevel {
-    ReadUncommitted,
-    ReadCommitted,
-    RepeatableRead,
-    Serializable,
-}
-
-/// Indicator that the following statements should be committed or rolled back atomically
-#[derive(Clone, PartialEq, Eq, Hash)]
-pub struct TransactionStart {
-    /// indicates if transaction is allowed to write
-    pub access_mode: TransactionAccessMode,
-    // indicates ANSI isolation level
-    pub isolation_level: TransactionIsolationLevel,
-    /// Empty schema
-    pub schema: DFSchemaRef,
-}
-
-/// Indicator that any current transaction should be terminated
-#[derive(Clone, PartialEq, Eq, Hash)]
-pub struct TransactionEnd {
-    /// whether the transaction committed or aborted
-    pub conclusion: TransactionConclusion,
-    /// if specified a new transaction is immediately started with same characteristics
-    pub chain: bool,
-    /// Empty schema
-    pub schema: DFSchemaRef,
-}
-
 /// Prepare a statement but do not execute it. Prepare statements can have 0 or more
 /// `Expr::Placeholder` expressions that are filled in during execution
 #[derive(Clone, PartialEq, Eq, Hash)]
diff --git a/datafusion/expr/src/logical_plan/statement.rs b/datafusion/expr/src/logical_plan/statement.rs
new file mode 100644
index 0000000000..dad09d996b
--- /dev/null
+++ b/datafusion/expr/src/logical_plan/statement.rs
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    fmt::{self, Display},
+    sync::Arc,
+};
+
+use datafusion_common::{DFSchemaRef, OwnedTableReference};
+
+use crate::LogicalPlan;
+
+/// Various types of Statements.
+///
+/// # Transactions:
+///
+/// While DataFusion does not offer support transactions, it provides
+/// [`LogicalPlan`](crate::LogicalPlan) support to assist building
+/// database systems using DataFusion
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub enum Statement {
+    // Begin a transaction
+    TransactionStart(TransactionStart),
+    // Commit or rollback a transaction
+    TransactionEnd(TransactionEnd),
+    /// Set a Variable
+    SetVariable(SetVariable),
+}
+
+impl Statement {
+    /// Get a reference to the logical plan's schema
+    pub fn schema(&self) -> &DFSchemaRef {
+        match self {
+            Statement::TransactionStart(TransactionStart { schema, .. }) => schema,
+            Statement::TransactionEnd(TransactionEnd { schema, .. }) => schema,
+            Statement::SetVariable(SetVariable { schema, .. }) => schema,
+        }
+    }
+
+    /// Return a descriptive string describing the type of this
+    /// [`Statement`]
+    pub fn name(&self) -> &str {
+        match self {
+            Statement::TransactionStart(_) => "TransactionStart",
+            Statement::TransactionEnd(_) => "TransactionEnd",
+            Statement::SetVariable(_) => "SetVariable",
+        }
+    }
+
+    /// Return a `format`able structure with the a human readable
+    /// description of this LogicalPlan node per node, not including
+    /// children.
+    ///
+    /// See [LogicalPlan::display] for an example
+    pub fn display(&self) -> impl fmt::Display + '_ {
+        struct Wrapper<'a>(&'a Statement);
+        impl<'a> Display for Wrapper<'a> {
+            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+                match self.0 {
+                    Statement::TransactionStart(TransactionStart {
+                        access_mode,
+                        isolation_level,
+                        ..
+                    }) => {
+                        write!(f, "TransactionStart: {access_mode:?} {isolation_level:?}")
+                    }
+                    Statement::TransactionEnd(TransactionEnd {
+                        conclusion,
+                        chain,
+                        ..
+                    }) => {
+                        write!(f, "TransactionEnd: {conclusion:?} chain:={chain}")
+                    }
+                    Statement::SetVariable(SetVariable {
+                        variable, value, ..
+                    }) => {
+                        write!(f, "SetVariable: set {variable:?} to {value:?}")
+                    }
+                }
+            }
+        }
+        Wrapper(self)
+    }
+}
+
+/// The operator that modifies the content of a database (adapted from
+/// substrait WriteRel)
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct DmlStatement {
+    /// The table name
+    pub table_name: OwnedTableReference,
+    /// The schema of the table (must align with Rel input)
+    pub table_schema: DFSchemaRef,
+    /// The type of operation to perform
+    pub op: WriteOp,
+    /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
+    pub input: Arc<LogicalPlan>,
+}
+
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub enum WriteOp {
+    Insert,
+    Delete,
+    Update,
+    Ctas,
+}
+
+impl Display for WriteOp {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            WriteOp::Insert => write!(f, "Insert"),
+            WriteOp::Delete => write!(f, "Delete"),
+            WriteOp::Update => write!(f, "Update"),
+            WriteOp::Ctas => write!(f, "Ctas"),
+        }
+    }
+}
+
+/// Indicates if a transaction was committed or aborted
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub enum TransactionConclusion {
+    Commit,
+    Rollback,
+}
+
+/// Indicates if this transaction is allowed to write
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub enum TransactionAccessMode {
+    ReadOnly,
+    ReadWrite,
+}
+
+/// Indicates ANSI transaction isolation level
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub enum TransactionIsolationLevel {
+    ReadUncommitted,
+    ReadCommitted,
+    RepeatableRead,
+    Serializable,
+}
+
+/// Indicator that the following statements should be committed or rolled back atomically
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct TransactionStart {
+    /// indicates if transaction is allowed to write
+    pub access_mode: TransactionAccessMode,
+    // indicates ANSI isolation level
+    pub isolation_level: TransactionIsolationLevel,
+    /// Empty schema
+    pub schema: DFSchemaRef,
+}
+
+/// Indicator that any current transaction should be terminated
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct TransactionEnd {
+    /// whether the transaction committed or aborted
+    pub conclusion: TransactionConclusion,
+    /// if specified a new transaction is immediately started with same characteristics
+    pub chain: bool,
+    /// Empty schema
+    pub schema: DFSchemaRef,
+}
+
+/// Set a Variable's value -- value in
+/// [`ConfigOptions`](datafusion_common::config::ConfigOptions)
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct SetVariable {
+    /// The variable name
+    pub variable: String,
+    /// The value to set
+    pub value: String,
+    /// Dummy schema
+    pub schema: DFSchemaRef,
+}
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 726365f574..f3975e8346 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -914,9 +914,7 @@ pub fn from_plan(
         | LogicalPlan::CreateExternalTable(_)
         | LogicalPlan::DropTable(_)
         | LogicalPlan::DropView(_)
-        | LogicalPlan::TransactionStart(_)
-        | LogicalPlan::TransactionEnd(_)
-        | LogicalPlan::SetVariable(_)
+        | LogicalPlan::Statement(_)
         | LogicalPlan::CreateCatalogSchema(_)
         | LogicalPlan::CreateCatalog(_) => {
             // All of these plan types have no inputs / exprs so should not be called
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index d3fba585fe..719b17110b 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -237,9 +237,7 @@ impl OptimizerRule for CommonSubexprEliminate {
             | LogicalPlan::CreateCatalog(_)
             | LogicalPlan::DropTable(_)
             | LogicalPlan::DropView(_)
-            | LogicalPlan::TransactionStart(_)
-            | LogicalPlan::TransactionEnd(_)
-            | LogicalPlan::SetVariable(_)
+            | LogicalPlan::Statement(_)
             | LogicalPlan::DescribeTable(_)
             | LogicalPlan::Distinct(_)
             | LogicalPlan::Extension(_)
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index 936ce4984c..ff86fc8634 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -1360,18 +1360,12 @@ impl AsLogicalPlan for LogicalPlanNode {
             LogicalPlan::DropView(_) => Err(proto_error(
                 "LogicalPlan serde is not yet implemented for DropView",
             )),
-            LogicalPlan::SetVariable(_) => Err(proto_error(
-                "LogicalPlan serde is not yet implemented for SetVariable",
+            LogicalPlan::Statement(_) => Err(proto_error(
+                "LogicalPlan serde is not yet implemented for Statement",
             )),
             LogicalPlan::Dml(_) => Err(proto_error(
                 "LogicalPlan serde is not yet implemented for Dml",
             )),
-            LogicalPlan::TransactionStart(_) => Err(proto_error(
-                "LogicalPlan serde is not yet implemented for Transactions",
-            )),
-            LogicalPlan::TransactionEnd(_) => Err(proto_error(
-                "LogicalPlan serde is not yet implemented for Transactions",
-            )),
             LogicalPlan::DescribeTable(_) => Err(proto_error(
                 "LogicalPlan serde is not yet implemented for DescribeTable",
             )),
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index b396976186..ef1ecfef58 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -30,17 +30,15 @@ use datafusion_common::{
 };
 use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
 use datafusion_expr::logical_plan::builder::project;
-use datafusion_expr::logical_plan::{
-    Analyze, Prepare, TransactionAccessMode, TransactionConclusion, TransactionEnd,
-    TransactionIsolationLevel, TransactionStart,
-};
 use datafusion_expr::utils::expr_to_columns;
 use datafusion_expr::{
-    cast, col, CreateCatalog, CreateCatalogSchema,
+    cast, col, Analyze, CreateCatalog, CreateCatalogSchema,
     CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView,
     DescribeTable, DmlStatement, DropTable, DropView, EmptyRelation, Explain,
-    ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, PlanType, SetVariable,
-    ToStringifiedPlan, WriteOp,
+    ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, PlanType, Prepare,
+    SetVariable, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
+    TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
+    WriteOp,
 };
 use sqlparser::ast;
 use sqlparser::ast::{
@@ -435,25 +433,28 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                         TransactionAccessMode::ReadWrite
                     }
                 };
-                Ok(LogicalPlan::TransactionStart(TransactionStart {
+                let statement = PlanStatement::TransactionStart(TransactionStart {
                     access_mode,
                     isolation_level,
                     schema: DFSchemaRef::new(DFSchema::empty()),
-                }))
+                });
+                Ok(LogicalPlan::Statement(statement))
             }
             Statement::Commit { chain } => {
-                Ok(LogicalPlan::TransactionEnd(TransactionEnd {
+                let statement = PlanStatement::TransactionEnd(TransactionEnd {
                     conclusion: TransactionConclusion::Commit,
                     chain,
                     schema: DFSchemaRef::new(DFSchema::empty()),
-                }))
+                });
+                Ok(LogicalPlan::Statement(statement))
             }
             Statement::Rollback { chain } => {
-                Ok(LogicalPlan::TransactionEnd(TransactionEnd {
+                let statement = PlanStatement::TransactionEnd(TransactionEnd {
                     conclusion: TransactionConclusion::Rollback,
                     chain,
                     schema: DFSchemaRef::new(DFSchema::empty()),
-                }))
+                });
+                Ok(LogicalPlan::Statement(statement))
             }
 
             _ => Err(DataFusionError::NotImplemented(format!(
@@ -736,11 +737,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             }
         };
 
-        Ok(LogicalPlan::SetVariable(SetVariable {
+        let statement = PlanStatement::SetVariable(SetVariable {
             variable: variable_lower,
             value: value_string,
             schema: DFSchemaRef::new(DFSchema::empty()),
-        }))
+        });
+
+        Ok(LogicalPlan::Statement(statement))
     }
 
     fn delete_to_plan(