You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/03 14:35:11 UTC

[arrow-datafusion] branch master updated: support `SET` variable (#4069)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new c9442ce6e support `SET` variable (#4069)
c9442ce6e is described below

commit c9442ce6ecf7efeb238c6c25c0519350d0ba3839
Author: Wei-Ting Kuo <wa...@gmail.com>
AuthorDate: Thu Nov 3 22:35:04 2022 +0800

    support `SET` variable (#4069)
    
    * support SET
    
    * remove useless comment
    
    * add test cases
    
    * Update datafusion/core/src/config.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Update datafusion/core/src/execution/context.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * fix test cases
    
    * fmt
    
    * Update datafusion/expr/src/logical_plan/plan.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/core/src/config.rs                      |   5 +
 datafusion/core/src/execution/context.rs           |  56 +++-
 datafusion/core/src/physical_plan/planner.rs       |   5 +
 datafusion/core/tests/sql/mod.rs                   |   1 +
 datafusion/core/tests/sql/set_variable.rs          | 296 +++++++++++++++++++++
 datafusion/expr/src/lib.rs                         |   2 +-
 datafusion/expr/src/logical_plan/mod.rs            |   6 +-
 datafusion/expr/src/logical_plan/plan.rs           |  26 +-
 datafusion/expr/src/utils.rs                       |   1 +
 .../optimizer/src/common_subexpr_eliminate.rs      |   1 +
 datafusion/optimizer/src/projection_push_down.rs   |   1 +
 datafusion/proto/src/logical_plan.rs               |   3 +
 datafusion/sql/src/planner.rs                      |  87 +++++-
 13 files changed, 483 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index d124fd94f..ea9fa765e 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -360,6 +360,11 @@ impl ConfigOptions {
         self.set(key, ScalarValue::UInt64(Some(value)))
     }
 
+    /// set a `String` configuration option
+    pub fn set_string(&mut self, key: &str, value: impl Into<String>) {
+        self.set(key, ScalarValue::Utf8(Some(value.into())))
+    }
+
     /// get a configuration option
     pub fn get(&self, key: &str) -> Option<ScalarValue> {
         self.options.get(key).cloned()
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index c66199e07..d4962f873 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -67,7 +67,7 @@ use crate::error::{DataFusionError, Result};
 use crate::logical_expr::{
     CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
     CreateView, DropTable, DropView, Explain, LogicalPlan, LogicalPlanBuilder,
-    TableSource, TableType, UNNAMED_TABLE,
+    SetVariable, TableSource, TableType, UNNAMED_TABLE,
 };
 use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
 use datafusion_sql::{ResolvedTableReference, TableReference};
@@ -341,6 +341,60 @@ impl SessionContext {
                     ))),
                 }
             }
+
+            LogicalPlan::SetVariable(SetVariable {
+                variable, value, ..
+            }) => {
+                let config_options = &self.state.write().config.config_options;
+
+                let old_value =
+                    config_options.read().get(&variable).ok_or_else(|| {
+                        DataFusionError::Execution(format!(
+                            "Can not SET variable: Unknown Variable {}",
+                            variable
+                        ))
+                    })?;
+
+                match old_value {
+                    ScalarValue::Boolean(_) => {
+                        let new_value = value.parse::<bool>().map_err(|_| {
+                            DataFusionError::Execution(format!(
+                                "Failed to parse {} as bool",
+                                value,
+                            ))
+                        })?;
+                        config_options.write().set_bool(&variable, new_value);
+                    }
+
+                    ScalarValue::UInt64(_) => {
+                        let new_value = value.parse::<u64>().map_err(|_| {
+                            DataFusionError::Execution(format!(
+                                "Failed to parse {} as u64",
+                                value,
+                            ))
+                        })?;
+                        config_options.write().set_u64(&variable, new_value);
+                    }
+
+                    ScalarValue::Utf8(_) => {
+                        let new_value = value.parse::<String>().map_err(|_| {
+                            DataFusionError::Execution(format!(
+                                "Failed to parse {} as String",
+                                value,
+                            ))
+                        })?;
+                        config_options.write().set_string(&variable, new_value);
+                    }
+
+                    _ => {
+                        return Err(DataFusionError::Execution(
+                            "Unsupported Scalar Value Type".to_string(),
+                        ))
+                    }
+                }
+                self.return_empty_dataframe()
+            }
+
             LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
                 schema_name,
                 if_not_exists,
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 085677785..17e09497f 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1097,6 +1097,11 @@ impl DefaultPhysicalPlanner {
                         "Unsupported logical plan: CreateView".to_string(),
                     ))
                 }
+                LogicalPlan::SetVariable(_) => {
+                    Err(DataFusionError::Internal(
+                        "Unsupported logical plan: SetVariable must be root of the plan".to_string(),
+                    ))
+                }
                 LogicalPlan::Explain(_) => Err(DataFusionError::Internal(
                     "Unsupported logical plan: Explain must be root of the plan".to_string(),
                 )),
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index ede9fafa7..1562574dc 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -109,6 +109,7 @@ pub mod idenfifers;
 pub mod information_schema;
 pub mod parquet_schema;
 pub mod partitioned_csv;
+pub mod set_variable;
 pub mod subqueries;
 #[cfg(feature = "unicode_expressions")]
 pub mod unicode;
diff --git a/datafusion/core/tests/sql/set_variable.rs b/datafusion/core/tests/sql/set_variable.rs
new file mode 100644
index 000000000..2bbe49120
--- /dev/null
+++ b/datafusion/core/tests/sql/set_variable.rs
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::*;
+
+#[tokio::test]
+async fn set_variable_to_value() {
+    let ctx =
+        SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+    ctx.sql("SET datafusion.execution.batch_size to 1")
+        .await
+        .unwrap();
+    let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+---------------------------------+---------+",
+        "| name                            | setting |",
+        "+---------------------------------+---------+",
+        "| datafusion.execution.batch_size | 1       |",
+        "+---------------------------------+---------+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+}
+
+#[tokio::test]
+async fn set_variable_to_value_with_equal_sign() {
+    let ctx =
+        SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+    ctx.sql("SET datafusion.execution.batch_size = 1")
+        .await
+        .unwrap();
+    let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+---------------------------------+---------+",
+        "| name                            | setting |",
+        "+---------------------------------+---------+",
+        "| datafusion.execution.batch_size | 1       |",
+        "+---------------------------------+---------+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+}
+
+#[tokio::test]
+async fn set_variable_to_value_with_single_quoted_string() {
+    let ctx =
+        SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+    ctx.sql("SET datafusion.execution.batch_size to '1'")
+        .await
+        .unwrap();
+    let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+---------------------------------+---------+",
+        "| name                            | setting |",
+        "+---------------------------------+---------+",
+        "| datafusion.execution.batch_size | 1       |",
+        "+---------------------------------+---------+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+}
+
+#[tokio::test]
+async fn set_variable_to_value_case_insensitive() {
+    let ctx =
+        SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+    ctx.sql("SET datafusion.EXECUTION.batch_size to '1'")
+        .await
+        .unwrap();
+    let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+---------------------------------+---------+",
+        "| name                            | setting |",
+        "+---------------------------------+---------+",
+        "| datafusion.execution.batch_size | 1       |",
+        "+---------------------------------+---------+",
+    ];
+    assert_batches_sorted_eq!(expected, &result);
+}
+
+#[tokio::test]
+async fn set_variable_unknown_variable() {
+    let ctx = SessionContext::new();
+
+    let err = plan_and_collect(&ctx, "SET aabbcc to '1'")
+        .await
+        .unwrap_err();
+    assert_eq!(
+        err.to_string(),
+        "Execution error: Can not SET variable: Unknown Variable aabbcc"
+    );
+}
+
+#[tokio::test]
+async fn set_bool_variable() {
+    let ctx =
+        SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+    ctx.sql("SET datafusion.execution.coalesce_batches to true")
+        .await
+        .unwrap();
+    let result = plan_and_collect(&ctx, "SHOW datafusion.execution.coalesce_batches")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+---------------------------------------+---------+",
+        "| name                                  | setting |",
+        "+---------------------------------------+---------+",
+        "| datafusion.execution.coalesce_batches | true    |",
+        "+---------------------------------------+---------+",
+    ];
+    assert_batches_eq!(expected, &result);
+
+    ctx.sql("SET datafusion.execution.coalesce_batches to 'false'")
+        .await
+        .unwrap();
+    let result = plan_and_collect(&ctx, "SHOW datafusion.execution.coalesce_batches")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+---------------------------------------+---------+",
+        "| name                                  | setting |",
+        "+---------------------------------------+---------+",
+        "| datafusion.execution.coalesce_batches | false   |",
+        "+---------------------------------------+---------+",
+    ];
+    assert_batches_eq!(expected, &result);
+}
+
+#[tokio::test]
+async fn set_bool_variable_bad_value() {
+    let ctx =
+        SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+    let err = plan_and_collect(&ctx, "SET datafusion.execution.coalesce_batches to 1")
+        .await
+        .unwrap_err();
+
+    assert_eq!(
+        err.to_string(),
+        "Execution error: Failed to parse 1 as bool"
+    );
+
+    let err = plan_and_collect(&ctx, "SET datafusion.execution.coalesce_batches to abc")
+        .await
+        .unwrap_err();
+
+    assert_eq!(
+        err.to_string(),
+        "Execution error: Failed to parse abc as bool"
+    );
+}
+
+#[tokio::test]
+async fn set_u64_variable() {
+    let ctx =
+        SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+    ctx.sql("SET datafusion.execution.batch_size to 0")
+        .await
+        .unwrap();
+    let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+---------------------------------+---------+",
+        "| name                            | setting |",
+        "+---------------------------------+---------+",
+        "| datafusion.execution.batch_size | 0       |",
+        "+---------------------------------+---------+",
+    ];
+    assert_batches_eq!(expected, &result);
+
+    ctx.sql("SET datafusion.execution.batch_size to '1'")
+        .await
+        .unwrap();
+    let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+---------------------------------+---------+",
+        "| name                            | setting |",
+        "+---------------------------------+---------+",
+        "| datafusion.execution.batch_size | 1       |",
+        "+---------------------------------+---------+",
+    ];
+    assert_batches_eq!(expected, &result);
+
+    ctx.sql("SET datafusion.execution.batch_size to +2")
+        .await
+        .unwrap();
+    let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+---------------------------------+---------+",
+        "| name                            | setting |",
+        "+---------------------------------+---------+",
+        "| datafusion.execution.batch_size | 2       |",
+        "+---------------------------------+---------+",
+    ];
+    assert_batches_eq!(expected, &result);
+}
+
+#[tokio::test]
+async fn set_u64_variable_bad_value() {
+    let ctx =
+        SessionContext::with_config(SessionConfig::new().with_information_schema(true));
+
+    let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to -1")
+        .await
+        .unwrap_err();
+
+    assert_eq!(
+        err.to_string(),
+        "Execution error: Failed to parse -1 as u64"
+    );
+
+    let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to abc")
+        .await
+        .unwrap_err();
+
+    assert_eq!(
+        err.to_string(),
+        "Execution error: Failed to parse abc as u64"
+    );
+
+    let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to 0.1")
+        .await
+        .unwrap_err();
+
+    assert_eq!(
+        err.to_string(),
+        "Execution error: Failed to parse 0.1 as u64"
+    );
+}
+
+#[tokio::test]
+async fn set_time_zone() {
+    // we don't support changing time zone for now until all time zone issues fixed and related function completed
+
+    let ctx = SessionContext::new();
+
+    // for full variable name
+    let err = plan_and_collect(&ctx, "set datafusion.execution.time_zone = '8'")
+        .await
+        .unwrap_err();
+
+    assert_eq!(
+        err.to_string(),
+        "Error during planning: Changing Time Zone isn't supported yet"
+    );
+
+    // for alias time zone
+    let err = plan_and_collect(&ctx, "set time zone = '8'")
+        .await
+        .unwrap_err();
+
+    assert_eq!(
+        err.to_string(),
+        "Error during planning: Changing Time Zone isn't supported yet"
+    );
+
+    // for alias timezone
+    let err = plan_and_collect(&ctx, "set timezone = '8'")
+        .await
+        .unwrap_err();
+
+    assert_eq!(
+        err.to_string(),
+        "Error during planning: Changing Time Zone isn't supported yet"
+    );
+}
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index 874a80713..45c281551 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -72,7 +72,7 @@ pub use logical_plan::{
     CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView,
     EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit,
     LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, PlanVisitor, Projection,
-    Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
+    Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
     ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, Window,
 };
 pub use nullif::SUPPORTED_NULLIF_TYPES;
diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs
index 8e4dfc0eb..2cfe921e6 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -25,9 +25,9 @@ pub use plan::{
     Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
     CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView,
     EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit,
-    LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
-    StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
-    Values, Window,
+    LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition,
+    SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
+    ToStringifiedPlan, Union, Values, Window,
 };
 
 pub use display::display_schema;
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index ce169f6ec..27586d702 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -106,6 +106,8 @@ pub enum LogicalPlan {
     Extension(Extension),
     /// Remove duplicate rows from the input
     Distinct(Distinct),
+    /// Set a Varaible
+    SetVariable(SetVariable),
 }
 
 impl LogicalPlan {
@@ -144,6 +146,7 @@ impl LogicalPlan {
             LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => schema,
             LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
             LogicalPlan::DropView(DropView { schema, .. }) => schema,
+            LogicalPlan::SetVariable(SetVariable { schema, .. }) => schema,
         }
     }
 
@@ -200,7 +203,9 @@ impl LogicalPlan {
             | LogicalPlan::CreateView(CreateView { input, .. })
             | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
             LogicalPlan::Distinct(Distinct { input, .. }) => input.all_schemas(),
-            LogicalPlan::DropTable(_) | LogicalPlan::DropView(_) => vec![],
+            LogicalPlan::DropTable(_)
+            | LogicalPlan::DropView(_)
+            | LogicalPlan::SetVariable(_) => vec![],
         }
     }
 
@@ -260,6 +265,7 @@ impl LogicalPlan {
             | LogicalPlan::CreateCatalogSchema(_)
             | LogicalPlan::CreateCatalog(_)
             | LogicalPlan::DropTable(_)
+            | LogicalPlan::SetVariable(_)
             | LogicalPlan::DropView(_)
             | LogicalPlan::CrossJoin(_)
             | LogicalPlan::Analyze { .. }
@@ -305,6 +311,7 @@ impl LogicalPlan {
             | LogicalPlan::CreateCatalogSchema(_)
             | LogicalPlan::CreateCatalog(_)
             | LogicalPlan::DropTable(_)
+            | LogicalPlan::SetVariable(_)
             | LogicalPlan::DropView(_) => vec![],
         }
     }
@@ -455,6 +462,7 @@ impl LogicalPlan {
             | LogicalPlan::CreateCatalogSchema(_)
             | LogicalPlan::CreateCatalog(_)
             | LogicalPlan::DropTable(_)
+            | LogicalPlan::SetVariable(_)
             | LogicalPlan::DropView(_) => true,
         };
         if !recurse {
@@ -939,6 +947,11 @@ impl LogicalPlan {
                     }) => {
                         write!(f, "DropView: {:?} if not exist:={}", name, if_exists)
                     }
+                    LogicalPlan::SetVariable(SetVariable {
+                        variable, value, ..
+                    }) => {
+                        write!(f, "SetVariable: set {:?} to {:?}", variable, value)
+                    }
                     LogicalPlan::Distinct(Distinct { .. }) => {
                         write!(f, "Distinct:")
                     }
@@ -1055,6 +1068,17 @@ pub struct DropView {
     pub schema: DFSchemaRef,
 }
 
+/// Set a Variable -- value in [`ConfigOptions`]
+#[derive(Clone)]
+pub struct SetVariable {
+    /// The variable name
+    pub variable: String,
+    /// The value to set
+    pub value: String,
+    /// Dummy schema
+    pub schema: DFSchemaRef,
+}
+
 /// Produces no rows: An empty relation with an empty schema
 #[derive(Clone)]
 pub struct EmptyRelation {
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index be4c45f8c..47939b733 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -584,6 +584,7 @@ pub fn from_plan(
         | LogicalPlan::CreateExternalTable(_)
         | LogicalPlan::DropTable(_)
         | LogicalPlan::DropView(_)
+        | LogicalPlan::SetVariable(_)
         | LogicalPlan::CreateCatalogSchema(_)
         | LogicalPlan::CreateCatalog(_) => {
             // All of these plan types have no inputs / exprs so should not be called
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index daa164aec..6ab1dd1fc 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -226,6 +226,7 @@ fn optimize(
         | LogicalPlan::CreateCatalog(_)
         | LogicalPlan::DropTable(_)
         | LogicalPlan::DropView(_)
+        | LogicalPlan::SetVariable(_)
         | LogicalPlan::Distinct(_)
         | LogicalPlan::Extension { .. } => {
             // apply the optimization to all inputs of the plan
diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs
index b2e776821..9bd1ef443 100644
--- a/datafusion/optimizer/src/projection_push_down.rs
+++ b/datafusion/optimizer/src/projection_push_down.rs
@@ -485,6 +485,7 @@ fn optimize_plan(
         | LogicalPlan::CreateCatalog(_)
         | LogicalPlan::DropTable(_)
         | LogicalPlan::DropView(_)
+        | LogicalPlan::SetVariable(_)
         | LogicalPlan::CrossJoin(_)
         | LogicalPlan::Distinct(_)
         | LogicalPlan::Extension { .. } => {
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index 13aaceb79..552045044 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -1351,6 +1351,9 @@ impl AsLogicalPlan for LogicalPlanNode {
             LogicalPlan::DropView(_) => Err(proto_error(
                 "LogicalPlan serde is not yet implemented for DropView",
             )),
+            LogicalPlan::SetVariable(_) => Err(proto_error(
+                "LogicalPlan serde is not yet implemented for DropView",
+            )),
         }
     }
 }
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 0d82fe356..2bc854f2e 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -27,7 +27,7 @@ use datafusion_expr::logical_plan::{
     Analyze, CreateCatalog, CreateCatalogSchema,
     CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView,
     DropTable, DropView, Explain, JoinType, LogicalPlan, LogicalPlanBuilder,
-    Partitioning, PlanType, ToStringifiedPlan,
+    Partitioning, PlanType, SetVariable, ToStringifiedPlan,
 };
 use datafusion_expr::utils::{
     can_hash, expand_qualified_wildcard, expand_wildcard, expr_as_column_expr,
@@ -161,6 +161,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             } => self.explain_statement_to_plan(verbose, analyze, *statement),
             Statement::Query(query) => self.query_to_plan(*query, &mut HashMap::new()),
             Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
+            Statement::SetVariable {
+                local,
+                hivevar,
+                variable,
+                value,
+            } => self.set_variable_to_plan(local, hivevar, &variable, value),
+
             Statement::CreateTable {
                 query: Some(query),
                 name,
@@ -2447,6 +2454,84 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         self.statement_to_plan(rewrite.pop_front().unwrap())
     }
 
+    fn set_variable_to_plan(
+        &self,
+        local: bool,
+        hivevar: bool,
+        variable: &ObjectName,
+        value: Vec<sqlparser::ast::Expr>,
+    ) -> Result<LogicalPlan> {
+        if local {
+            return Err(DataFusionError::NotImplemented(
+                "LOCAL is not supported".to_string(),
+            ));
+        }
+
+        if hivevar {
+            return Err(DataFusionError::NotImplemented(
+                "HIVEVAR is not supported".to_string(),
+            ));
+        }
+
+        let variable = variable.to_string();
+        let mut variable_lower = variable.to_lowercase();
+
+        if variable_lower == "timezone" || variable_lower == "time.zone" {
+            // we could introduce alias in OptionDefinition if this string matching thing grows
+            variable_lower = "datafusion.execution.time_zone".to_string();
+        }
+
+        // we don't support change time zone until we complete time zone related implementation
+        if variable_lower == "datafusion.execution.time_zone" {
+            return Err(DataFusionError::Plan(
+                "Changing Time Zone isn't supported yet".to_string(),
+            ));
+        }
+
+        // parse value string from Expr
+        let value_string = match &value[0] {
+            SQLExpr::Identifier(i) => i.to_string(),
+            SQLExpr::Value(v) => match v {
+                Value::SingleQuotedString(s) => s.to_string(),
+                Value::Number(_, _) | Value::Boolean(_) => v.to_string(),
+                Value::DoubleQuotedString(_)
+                | Value::EscapedStringLiteral(_)
+                | Value::NationalStringLiteral(_)
+                | Value::HexStringLiteral(_)
+                | Value::Null
+                | Value::Placeholder(_) => {
+                    return Err(DataFusionError::Plan(format!(
+                        "Unspported Value {}",
+                        value[0]
+                    )))
+                }
+            },
+            // for capture signed number e.g. +8, -8
+            SQLExpr::UnaryOp { op, expr } => match op {
+                UnaryOperator::Plus => format!("+{}", expr),
+                UnaryOperator::Minus => format!("-{}", expr),
+                _ => {
+                    return Err(DataFusionError::Plan(format!(
+                        "Unspported Value {}",
+                        value[0]
+                    )))
+                }
+            },
+            _ => {
+                return Err(DataFusionError::Plan(format!(
+                    "Unspported Value {}",
+                    value[0]
+                )))
+            }
+        };
+
+        Ok(LogicalPlan::SetVariable(SetVariable {
+            variable: variable_lower,
+            value: value_string,
+            schema: DFSchemaRef::new(DFSchema::empty()),
+        }))
+    }
+
     fn show_columns_to_plan(
         &self,
         extended: bool,