You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/05/28 10:38:40 UTC

[flink] branch release-1.13 updated (8d62fe8 -> 320ed88)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 8d62fe8  [FLINK-22655][sql-client] Fix "-i init.sql" doesn't work when first line is a comment
     new df2d088  [FLINK-22770][sql-parser][planner-blink] Expose SET/RESET
     new 320ed88  [FLINK-22770][docs] Update usage of SET in docs to use quotes

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../content.zh/docs/dev/table/concepts/timezone.md | 36 ++++-----
 docs/content.zh/docs/dev/table/config.md           |  6 +-
 docs/content.zh/docs/dev/table/sql/reset.md        |  4 +-
 docs/content.zh/docs/dev/table/sql/set.md          |  4 +-
 docs/content.zh/docs/dev/table/sqlClient.md        | 50 ++++++-------
 docs/content/docs/dev/table/concepts/timezone.md   | 36 ++++-----
 docs/content/docs/dev/table/config.md              |  6 +-
 docs/content/docs/dev/table/sql/reset.md           |  4 +-
 docs/content/docs/dev/table/sql/set.md             |  4 +-
 docs/content/docs/dev/table/sqlClient.md           | 50 ++++++-------
 .../src/main/codegen/data/Parser.tdd               |  4 +
 .../src/main/codegen/includes/parserImpls.ftl      | 45 ++++++++++++
 .../ddl/{SqlUseModules.java => SqlReset.java}      | 52 ++++++++-----
 .../ddl/{SqlAlterDatabase.java => SqlSet.java}     | 85 +++++++++++++---------
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 13 ++++
 .../operations/SqlToOperationConverter.java        | 22 ++++++
 .../planner/parse/ResetOperationParseStrategy.java |  2 +-
 .../planner/parse/SetOperationParseStrategy.java   |  2 +-
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  9 +--
 .../table/planner/delegation/ParserImplTest.java   |  3 +-
 .../operations/SqlToOperationConverterTest.java    | 27 +++++++
 .../parse/ResetOperationParseStrategyTest.java}    | 18 +++--
 .../parse/SetOperationParseStrategyTest.java       | 27 +++----
 .../apache/flink/table/parse/ExtendedParser.java   |  2 +-
 24 files changed, 326 insertions(+), 185 deletions(-)
 copy flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/{SqlUseModules.java => SqlReset.java} (62%)
 copy flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/{SqlAlterDatabase.java => SqlSet.java} (50%)
 copy flink-table/{flink-table-common/src/test/java/org/apache/flink/table/module/CoreModuleTest.java => flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategyTest.java} (62%)
 copy flink-core/src/test/java/org/apache/flink/util/IterableUtilsTest.java => flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/parse/SetOperationParseStrategyTest.java (58%)

[flink] 02/02: [FLINK-22770][docs] Update usage of SET in docs to use quotes

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 320ed880513377acc622daf7764a9be70e7704f0
Author: Ingo Bürk <in...@tngtech.com>
AuthorDate: Wed May 26 11:18:56 2021 +0200

    [FLINK-22770][docs] Update usage of SET in docs to use quotes
---
 .../content.zh/docs/dev/table/concepts/timezone.md | 36 ++++++++--------
 docs/content.zh/docs/dev/table/config.md           |  6 +--
 docs/content.zh/docs/dev/table/sql/reset.md        |  4 +-
 docs/content.zh/docs/dev/table/sql/set.md          |  4 +-
 docs/content.zh/docs/dev/table/sqlClient.md        | 50 +++++++++++-----------
 docs/content/docs/dev/table/concepts/timezone.md   | 36 ++++++++--------
 docs/content/docs/dev/table/config.md              |  6 +--
 docs/content/docs/dev/table/sql/reset.md           |  4 +-
 docs/content/docs/dev/table/sql/set.md             |  4 +-
 docs/content/docs/dev/table/sqlClient.md           | 50 +++++++++++-----------
 10 files changed, 100 insertions(+), 100 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/concepts/timezone.md b/docs/content.zh/docs/dev/table/concepts/timezone.md
index 870dffa..730489e 100644
--- a/docs/content.zh/docs/dev/table/concepts/timezone.md
+++ b/docs/content.zh/docs/dev/table/concepts/timezone.md
@@ -49,7 +49,7 @@ Flink SQL> SELECT TIMESTAMP '1970-01-01 00:00:04.001';
 
  ```sql
 Flink SQL> CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3);
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 Flink SQL> SELECT * FROM T1;
 +---------------------------+
 | TO_TIMESTAMP_LTZ(4001, 3) |
@@ -57,7 +57,7 @@ Flink SQL> SELECT * FROM T1;
 |   1970-01-01 00:00:04.001 |
 +---------------------------+
 
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 Flink SQL> SELECT * FROM T1;
 +---------------------------+
 | TO_TIMESTAMP_LTZ(4001, 3) |
@@ -76,13 +76,13 @@ Flink SQL> SELECT * FROM T1;
 {{< tab "SQL Client" >}}
 ```sql
 -- 设置为 UTC 时区
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 
 -- 设置为上海时区
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 
 -- 设置为Los_Angeles时区
-Flink SQL> SET table.local-time-zone=America/Los_Angeles;
+Flink SQL> SET 'table.local-time-zone' = 'America/Los_Angeles';
 ```
 {{< /tab >}}
 {{< tab "Java" >}}
@@ -132,7 +132,7 @@ session (会话)中配置的时区会对以下函数生效。
 
 
 ```sql
-Flink SQL> SET sql-client.execution.result-mode=tableau;
+Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
 Flink SQL> CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME();
 Flink SQL> DESC MyView1;
 ```
@@ -153,7 +153,7 @@ Flink SQL> DESC MyView1;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 Flink SQL> SELECT * FROM MyView1;
 ```
 
@@ -166,7 +166,7 @@ Flink SQL> SELECT * FROM MyView1;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 Flink SQL> SELECT * FROM MyView1;
 ```
 
@@ -195,7 +195,7 @@ Flink SQL> DESC MyView2;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 Flink SQL> SELECT * FROM MyView2;
 ```
 
@@ -208,7 +208,7 @@ Flink SQL> SELECT * FROM MyView2;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 Flink SQL> SELECT * FROM MyView2;
 ```
 
@@ -264,7 +264,7 @@ Flink SQL 使用函数 `PROCTIME()` 来定义处理时间属性, 该函数返
 `PROCTIME()` 返回的是本地时区的时间, 使用 `TIMESTAMP_LTZ` 类型也可以支持夏令时时间。
 
 ```sql
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 Flink SQL> SELECT PROCTIME();
 ```
 ```
@@ -276,7 +276,7 @@ Flink SQL> SELECT PROCTIME();
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 Flink SQL> SELECT PROCTIME();
 ```
 ```
@@ -336,7 +336,7 @@ C,3.8
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 Flink SQL> SELECT * FROM MyView3;
 ```
 
@@ -351,7 +351,7 @@ Flink SQL> SELECT * FROM MyView3;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 Flink SQL> SELECT * FROM MyView3;
 ```
 
@@ -426,7 +426,7 @@ C,3.8,2021-04-15 14:11:00
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=UTC; 
+Flink SQL> SET 'table.local-time-zone' = 'UTC'; 
 Flink SQL> SELECT * FROM MyView4;
 ```
                
@@ -441,7 +441,7 @@ Flink SQL> SELECT * FROM MyView4;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=Asia/Shanghai; 
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; 
 Flink SQL> SELECT * FROM MyView4;
 ```
 
@@ -508,7 +508,7 @@ C,3.8,1618495860000  # The corresponding utc timestamp is 2021-04-15 14:11:00
 ```    
 
 ```sql
-Flink SQL> SET table.local-time-zone=UTC; 
+Flink SQL> SET 'table.local-time-zone' = 'UTC'; 
 Flink SQL> SELECT * FROM MyView5;
 ```                         
                
@@ -523,7 +523,7 @@ Flink SQL> SELECT * FROM MyView5;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=Asia/Shanghai; 
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; 
 Flink SQL> SELECT * FROM MyView5;
 ```
 
diff --git a/docs/content.zh/docs/dev/table/config.md b/docs/content.zh/docs/dev/table/config.md
index 94d0048..a313bdc 100644
--- a/docs/content.zh/docs/dev/table/config.md
+++ b/docs/content.zh/docs/dev/table/config.md
@@ -84,9 +84,9 @@ configuration.set_string("table.exec.mini-batch.size", "5000")
 {{< /tab >}}
 {{< tab "SQL CLI" >}}
 ```
-Flink SQL> SET table.exec.mini-batch.enabled = true;
-Flink SQL> SET table.exec.mini-batch.allow-latency = 5s;
-Flink SQL> SET table.exec.mini-batch.size = 5000;
+Flink SQL> SET 'table.exec.mini-batch.enabled' = 'true';
+Flink SQL> SET 'table.exec.mini-batch.allow-latency' = '5s';
+Flink SQL> SET 'table.exec.mini-batch.size' = '5000';
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/docs/content.zh/docs/dev/table/sql/reset.md b/docs/content.zh/docs/dev/table/sql/reset.md
index 24c856f..28c9220 100644
--- a/docs/content.zh/docs/dev/table/sql/reset.md
+++ b/docs/content.zh/docs/dev/table/sql/reset.md
@@ -43,7 +43,7 @@ The following examples show how to run a `RESET` statement in SQL CLI.
 {{< tabs "reset" >}}
 {{< tab "SQL CLI" >}}
 ```sql
-Flink SQL> RESET table.planner;
+Flink SQL> RESET 'table.planner';
 [INFO] Session property has been reset.
 
 Flink SQL> RESET;
@@ -55,7 +55,7 @@ Flink SQL> RESET;
 ## Syntax
 
 ```sql
-RESET (key)?
+RESET ('key')?
 ```
 
 If no key is specified, it reset all the properties to the default. Otherwise, reset the specified key to the default.
diff --git a/docs/content.zh/docs/dev/table/sql/set.md b/docs/content.zh/docs/dev/table/sql/set.md
index 83c79e9..9ce249f 100644
--- a/docs/content.zh/docs/dev/table/sql/set.md
+++ b/docs/content.zh/docs/dev/table/sql/set.md
@@ -43,7 +43,7 @@ The following examples show how to run a `SET` statement in SQL CLI.
 {{< tabs "set" >}}
 {{< tab "SQL CLI" >}}
 ```sql
-Flink SQL> SET table.planner = blink;
+Flink SQL> SET 'table.planner' = 'blink';
 [INFO] Session property has been set.
 
 Flink SQL> SET;
@@ -55,7 +55,7 @@ table.planner=blink;
 ## Syntax
 
 ```sql
-SET (key = value)?
+SET ('key' = 'value')?
 ```
 
 If no key and value are specified, it just print all the properties. Otherwise, set the key with specified value.
diff --git a/docs/content.zh/docs/dev/table/sqlClient.md b/docs/content.zh/docs/dev/table/sqlClient.md
index 8b57c32..6173154 100644
--- a/docs/content.zh/docs/dev/table/sqlClient.md
+++ b/docs/content.zh/docs/dev/table/sqlClient.md
@@ -74,20 +74,20 @@ CLI 为维护和可视化结果提供**三种模式**。
 **表格模式**(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
 
 ```text
-SET sql-client.execution.result-mode=table;
+SET 'sql-client.execution.result-mode' = 'table';
 ```
 
 **变更日志模式**(changelog mode)不会实体化和可视化结果,而是由插入(`+`)和撤销(`-`)组成的持续查询产生结果流。
 
 ```text
-SET sql-client.execution.result-mode=changelog;
+SET 'sql-client.execution.result-mode' = 'changelog';
 ```
 
 **Tableau模式**(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业
 执行模式的不同(`execution.type`):
 
 ```text
-SET sql-client.execution.result-mode=tableau;
+SET 'sql-client.execution.result-mode' = 'tableau';
 ```
 
 注意当你使用这个模式运行一个流式查询的时候,Flink 会将结果持续的打印在当前的屏幕之上。如果这个流式查询的输入是有限的数据集,
@@ -339,21 +339,21 @@ CREATE FUNCTION foo.bar.AggregateUDF AS myUDF;
 
 -- Properties that change the fundamental execution behavior of a table program.
 
-SET table.planner = blink; -- planner: either 'blink' (default) or 'old'
-SET execution.runtime-mode = streaming; -- execution mode either 'batch' or 'streaming'
-SET sql-client.execution.result-mode = table; -- available values: 'table', 'changelog' and 'tableau'
-SET sql-client.execution.max-table-result.rows = 10000; -- optional: maximum number of maintained rows
-SET parallelism.default = 1; -- optional: Flink's parallelism (1 by default)
-SET pipeline.auto-watermark-interval = 200; --optional: interval for periodic watermarks
-SET pipeline.max-parallelism = 10; -- optional: Flink's maximum parallelism
-SET table.exec.state.ttl=1000; -- optional: table program's idle state time
-SET restart-strategy = fixed-delay;
+SET 'table.planner' = 'blink'; -- planner: either 'blink' (default) or 'old'
+SET 'execution.runtime-mode' = 'streaming'; -- execution mode either 'batch' or 'streaming'
+SET 'sql-client.execution.result-mode' = 'table'; -- available values: 'table', 'changelog' and 'tableau'
+SET 'sql-client.execution.max-table-result.rows' = '10000'; -- optional: maximum number of maintained rows
+SET 'parallelism.default' = '1'; -- optional: Flink's parallelism (1 by default)
+SET 'pipeline.auto-watermark-interval' = '200'; --optional: interval for periodic watermarks
+SET 'pipeline.max-parallelism' = '10'; -- optional: Flink's maximum parallelism
+SET 'table.exec.state.ttl' = '1000'; -- optional: table program's idle state time
+SET 'restart-strategy' = 'fixed-delay';
 
 -- Configuration options for adjusting and tuning table programs.
 
-SET table.optimizer.join-reorder-enabled = true;
-SET table.exec.spill-compression.enabled = true;
-SET table.exec.spill-compression.block-size = 128kb;
+SET 'table.optimizer.join-reorder-enabled' = 'true';
+SET 'table.exec.spill-compression.enabled' = 'true';
+SET 'table.exec.spill-compression.block-size' = '128kb';
 ```
 
 This configuration:
@@ -403,7 +403,7 @@ In interactive Command Line, the SQL Client reads user inputs and executes the s
 
 SQL Client will print success message if the statement is executed successfully. When getting errors, SQL Client will also print error messages.
 By default, the error message only contains the error cause. In order to print the full exception stack for debugging, please set the
-`sql-client.verbose` to true through command `SET sql-client.verbose = true;`.
+`sql-client.verbose` to true through command `SET 'sql-client.verbose' = 'true';`.
 
 ### Execute SQL Files
 
@@ -429,19 +429,19 @@ CREATE TEMPORARY TABLE users (
 );
 
 -- set sync mode
-SET table.dml-sync=true;
+SET 'table.dml-sync' = 'true';
 
 -- set the job name
-SET pipeline.name=SqlJob;
+SET 'pipeline.name' = 'SqlJob';
 
 -- set the queue that the job submit to
-SET yarn.application.queue=root;
+SET 'yarn.application.queue' = 'root';
 
 -- set the job parallism
-SET parallism.default=100;
+SET 'parallism.default' = '100';
 
 -- restore from the specific savepoint path
-SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab;
+SET 'execution.savepoint.path' = '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab';
 
 INSERT INTO pageviews_enriched
 SELECT *
@@ -609,7 +609,7 @@ previous DML statement finishes. In order to execute DML statements synchronousl
 `table.dml-sync` option true in SQL Client.
 
 ```sql
-Flink SQL> SET table.dml-sync = true;
+Flink SQL> SET 'table.dml-sync' = 'true';
 [INFO] Session property has been set.
 
 Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
@@ -625,7 +625,7 @@ Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
 Flink supports to start the job with specified savepoint. In SQL Client, it's allowed to use `SET` command to specify the path of the savepoint.
 
 ```sql
-Flink SQL> SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab;
+Flink SQL> SET 'execution.savepoint.path' = '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab';
 [INFO] Session property has been set.
 
 -- all the following DML statements will be restroed from the specified savepoint path
@@ -648,7 +648,7 @@ For more details about creating and managing savepoints, please refer to [Job Li
 SQL Client supports to define job name for queries and DML statements through `SET` command.
 
 ```sql
-Flink SQL> SET pipeline.name= 'kafka-to-hive' ;
+Flink SQL> SET 'pipeline.name' = 'kafka-to-hive';
 [INFO] Session property has been set.
 
 -- all the following DML statements will use the specified job name.
@@ -673,7 +673,7 @@ To be compatible with before, SQL Client still supports to initialize with envir
 When set the key defined in YAML file, the SQL Client will print the warning messages to inform.
 
 ```sql
-Flink SQL> SET execution.type = batch;
+Flink SQL> SET 'execution.type' = 'batch';
 [WARNING] The specified key 'execution.type' is deprecated. Please use 'execution.runtime-mode' instead.
 [INFO] Session property has been set.
 
diff --git a/docs/content/docs/dev/table/concepts/timezone.md b/docs/content/docs/dev/table/concepts/timezone.md
index 910b603..03e30f0 100644
--- a/docs/content/docs/dev/table/concepts/timezone.md
+++ b/docs/content/docs/dev/table/concepts/timezone.md
@@ -49,7 +49,7 @@ Flink SQL> SELECT TIMESTAMP '1970-01-01 00:00:04.001';
 
  ```sql
 Flink SQL> CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3);
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 Flink SQL> SELECT * FROM T1;
 +---------------------------+
 | TO_TIMESTAMP_LTZ(4001, 3) |
@@ -57,7 +57,7 @@ Flink SQL> SELECT * FROM T1;
 |   1970-01-01 00:00:04.001 |
 +---------------------------+
 
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 Flink SQL> SELECT * FROM T1;
 +---------------------------+
 | TO_TIMESTAMP_LTZ(4001, 3) |
@@ -76,13 +76,13 @@ The local time zone defines current session time zone id. You can config the tim
 {{< tab "SQL Client" >}}
 ```sql
 -- set to UTC time zone
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 
 -- set to Shanghai time zone
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 
 -- set to Los_Angeles time zone
-Flink SQL> SET table.local-time-zone=America/Los_Angeles;
+Flink SQL> SET 'table.local-time-zone' = 'America/Los_Angeles';
 ```
 {{< /tab >}}
 {{< tab "Java" >}}
@@ -132,7 +132,7 @@ The following time functions is influenced by the configured time zone.
 
 
 ```sql
-Flink SQL> SET sql-client.execution.result-mode=tableau;
+Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
 Flink SQL> CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME();
 Flink SQL> DESC MyView1;
 ```
@@ -153,7 +153,7 @@ Flink SQL> DESC MyView1;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 Flink SQL> SELECT * FROM MyView1;
 ```
 
@@ -166,7 +166,7 @@ Flink SQL> SELECT * FROM MyView1;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 Flink SQL> SELECT * FROM MyView1;
 ```
 
@@ -195,7 +195,7 @@ Flink SQL> DESC MyView2;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 Flink SQL> SELECT * FROM MyView2;
 ```
 
@@ -208,7 +208,7 @@ Flink SQL> SELECT * FROM MyView2;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 Flink SQL> SELECT * FROM MyView2;
 ```
 
@@ -265,7 +265,7 @@ Flink 1.13 fixes this issue and uses `TIMESTAMP_LTZ` type as return type of `PRO
 The PROCTIME() always represents your local timestamp value, using TIMESTAMP_LTZ type can also support DayLight Saving Time well.
 
 ```sql
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 Flink SQL> SELECT PROCTIME();
 ```
 ```
@@ -277,7 +277,7 @@ Flink SQL> SELECT PROCTIME();
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 Flink SQL> SELECT PROCTIME();
 ```
 ```
@@ -337,7 +337,7 @@ C,3.8
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 Flink SQL> SELECT * FROM MyView3;
 ```
 
@@ -352,7 +352,7 @@ Flink SQL> SELECT * FROM MyView3;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 Flink SQL> SELECT * FROM MyView3;
 ```
 
@@ -427,7 +427,7 @@ C,3.8,2021-04-15 14:11:00
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 Flink SQL> SELECT * FROM MyView4;
 ```
 
@@ -442,7 +442,7 @@ Flink SQL> SELECT * FROM MyView4;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 Flink SQL> SELECT * FROM MyView4;
 ```
 
@@ -509,7 +509,7 @@ C,3.8,1618495860000  # The corresponding utc timestamp is 2021-04-15 14:11:00
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SET 'table.local-time-zone' = 'UTC';
 Flink SQL> SELECT * FROM MyView5;
 ```
 
@@ -524,7 +524,7 @@ Flink SQL> SELECT * FROM MyView5;
 ```
 
 ```sql
-Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
 Flink SQL> SELECT * FROM MyView5;
 ```
 
diff --git a/docs/content/docs/dev/table/config.md b/docs/content/docs/dev/table/config.md
index a4fd7b7..17f7d72 100644
--- a/docs/content/docs/dev/table/config.md
+++ b/docs/content/docs/dev/table/config.md
@@ -91,9 +91,9 @@ configuration.set_string("table.exec.mini-batch.size", "5000")
 {{< /tab >}}
 {{< tab "SQL CLI" >}}
 ```
-Flink SQL> SET table.exec.mini-batch.enabled = true;
-Flink SQL> SET table.exec.mini-batch.allow-latency = 5s;
-Flink SQL> SET table.exec.mini-batch.size = 5000;
+Flink SQL> SET 'table.exec.mini-batch.enabled' = 'true';
+Flink SQL> SET 'table.exec.mini-batch.allow-latency' = '5s';
+Flink SQL> SET 'table.exec.mini-batch.size' = '5000';
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/docs/content/docs/dev/table/sql/reset.md b/docs/content/docs/dev/table/sql/reset.md
index cd7caf0..b08aac9 100644
--- a/docs/content/docs/dev/table/sql/reset.md
+++ b/docs/content/docs/dev/table/sql/reset.md
@@ -43,7 +43,7 @@ The following examples show how to run a `RESET` statement in SQL CLI.
 {{< tabs "reset" >}}
 {{< tab "SQL CLI" >}}
 ```sql
-Flink SQL> RESET table.planner;
+Flink SQL> RESET 'table.planner';
 [INFO] Session property has been reset.
 
 Flink SQL> RESET;
@@ -55,7 +55,7 @@ Flink SQL> RESET;
 ## Syntax
 
 ```sql
-RESET (key)?
+RESET ('key')?
 ```
 
 If no key is specified, it reset all the properties to the default. Otherwise, reset the specified key to the default.
diff --git a/docs/content/docs/dev/table/sql/set.md b/docs/content/docs/dev/table/sql/set.md
index 18ca6fd..05d9bd7 100644
--- a/docs/content/docs/dev/table/sql/set.md
+++ b/docs/content/docs/dev/table/sql/set.md
@@ -43,7 +43,7 @@ The following examples show how to run a `SET` statement in SQL CLI.
 {{< tabs "set" >}}
 {{< tab "SQL CLI" >}}
 ```sql
-Flink SQL> SET table.planner = blink;
+Flink SQL> SET 'table.planner' = 'blink';
 [INFO] Session property has been set.
 
 Flink SQL> SET;
@@ -55,7 +55,7 @@ table.planner=blink;
 ## Syntax
 
 ```sql
-SET (key = value)?
+SET ('key' = 'value')?
 ```
 
 If no key and value are specified, it just print all the properties. Otherwise, set the key with specified value.
diff --git a/docs/content/docs/dev/table/sqlClient.md b/docs/content/docs/dev/table/sqlClient.md
index 1cb77c9..d546a10 100644
--- a/docs/content/docs/dev/table/sqlClient.md
+++ b/docs/content/docs/dev/table/sqlClient.md
@@ -78,21 +78,21 @@ The **table mode** materializes results in memory and visualizes them in a regul
 It can be enabled by executing the following command in the CLI:
 
 ```text
-SET sql-client.execution.result-mode=table;
+SET 'sql-client.execution.result-mode' = 'table';
 ```
 
 The **changelog mode** does not materialize results and visualizes the result stream that is produced
 by a [continuous query]({{< ref "docs/dev/table/concepts/dynamic_tables" >}}#continuous-queries) consisting of insertions (`+`) and retractions (`-`).
 
 ```text
-SET sql-client.execution.result-mode=changelog;
+SET 'sql-client.execution.result-mode' = 'changelog';
 ```
 
 The **tableau mode** is more like a traditional way which will display the results in the screen directly with a tableau format.
 The displaying content will be influenced by the query execution type(`execution.type`).
 
 ```text
-SET sql-client.execution.result-mode=tableau;
+SET 'sql-client.execution.result-mode' = 'tableau';
 ```
 
 Note that when you use this mode with streaming query, the result will be continuously printed on the console. If the input data of
@@ -345,21 +345,21 @@ CREATE FUNCTION foo.bar.AggregateUDF AS myUDF;
 
 -- Properties that change the fundamental execution behavior of a table program.
 
-SET table.planner = blink; -- planner: either 'blink' (default) or 'old'
-SET execution.runtime-mode = streaming; -- execution mode either 'batch' or 'streaming'
-SET sql-client.execution.result-mode = table; -- available values: 'table', 'changelog' and 'tableau'
-SET sql-client.execution.max-table-result.rows = 10000; -- optional: maximum number of maintained rows
-SET parallelism.default = 1; -- optional: Flink's parallelism (1 by default)
-SET pipeline.auto-watermark-interval = 200; --optional: interval for periodic watermarks
-SET pipeline.max-parallelism = 10; -- optional: Flink's maximum parallelism
-SET table.exec.state.ttl=1000; -- optional: table program's idle state time
-SET restart-strategy = fixed-delay;
+SET 'table.planner' = 'blink'; -- planner: either 'blink' (default) or 'old'
+SET 'execution.runtime-mode' = 'streaming'; -- execution mode either 'batch' or 'streaming'
+SET 'sql-client.execution.result-mode' = 'table'; -- available values: 'table', 'changelog' and 'tableau'
+SET 'sql-client.execution.max-table-result.rows' = '10000'; -- optional: maximum number of maintained rows
+SET 'parallelism.default' = '1'; -- optional: Flink's parallelism (1 by default)
+SET 'pipeline.auto-watermark-interval' = '200'; --optional: interval for periodic watermarks
+SET 'pipeline.max-parallelism' = '10'; -- optional: Flink's maximum parallelism
+SET 'table.exec.state.ttl' = '1000'; -- optional: table program's idle state time
+SET 'restart-strategy' = 'fixed-delay';
 
 -- Configuration options for adjusting and tuning table programs.
 
-SET table.optimizer.join-reorder-enabled = true;
-SET table.exec.spill-compression.enabled = true;
-SET table.exec.spill-compression.block-size = 128kb;
+SET 'table.optimizer.join-reorder-enabled' = 'true';
+SET 'table.exec.spill-compression.enabled' = 'true';
+SET 'table.exec.spill-compression.block-size' = '128kb';
 ```
 
 This configuration:
@@ -409,7 +409,7 @@ In interactive Command Line, the SQL Client reads user inputs and executes the s
 
 SQL Client will print success message if the statement is executed successfully. When getting errors, SQL Client will also print error messages.
 By default, the error message only contains the error cause. In order to print the full exception stack for debugging, please set the
-`sql-client.verbose` to true through command `SET sql-client.verbose = true;`.
+`sql-client.verbose` to true through command `SET 'sql-client.verbose' = 'true';`.
 
 ### Execute SQL Files
 
@@ -435,19 +435,19 @@ CREATE TEMPORARY TABLE users (
 );
 
 -- set sync mode
-SET table.dml-sync=true;
+SET 'table.dml-sync' = 'true';
 
 -- set the job name
-SET pipeline.name=SqlJob;
+SET 'pipeline.name' = 'SqlJob';
 
 -- set the queue that the job submit to
-SET yarn.application.queue=root;
+SET 'yarn.application.queue' = 'root';
 
 -- set the job parallism
-SET parallism.default=100;
+SET 'parallism.default' = '100';
 
 -- restore from the specific savepoint path
-SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab;
+SET 'execution.savepoint.path' = '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab';
 
 INSERT INTO pageviews_enriched
 SELECT *
@@ -615,7 +615,7 @@ previous DML statement finishes. In order to execute DML statements synchronousl
 `table.dml-sync` option true in SQL Client.
 
 ```sql
-Flink SQL> SET table.dml-sync = true;
+Flink SQL> SET 'table.dml-sync' = 'true';
 [INFO] Session property has been set.
 
 Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
@@ -631,7 +631,7 @@ Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
 Flink supports to start the job with specified savepoint. In SQL Client, it's allowed to use `SET` command to specify the path of the savepoint.
 
 ```sql
-Flink SQL> SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab;
+Flink SQL> SET 'execution.savepoint.path' = '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab';
 [INFO] Session property has been set.
 
 -- all the following DML statements will be restroed from the specified savepoint path
@@ -654,7 +654,7 @@ For more details about creating and managing savepoints, please refer to [Job Li
 SQL Client supports to define job name for queries and DML statements through `SET` command.
 
 ```sql
-Flink SQL> SET pipeline.name= 'kafka-to-hive' ;
+Flink SQL> SET 'pipeline.name' = 'kafka-to-hive';
 [INFO] Session property has been set.
 
 -- all the following DML statements will use the specified job name.
@@ -679,7 +679,7 @@ To be compatible with before, SQL Client still supports to initialize with envir
 When set the key defined in YAML file, the SQL Client will print the warning messages to inform.
 
 ```sql
-Flink SQL> SET execution.type = batch;
+Flink SQL> SET 'execution.type' = 'batch';
 [WARNING] The specified key 'execution.type' is deprecated. Please use 'execution.runtime-mode' instead.
 [INFO] Session property has been set.
 

[flink] 01/02: [FLINK-22770][sql-parser][planner-blink] Expose SET/RESET

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df2d08837d89e94066393942f5147b6a0ffead22
Author: Ingo Bürk <in...@tngtech.com>
AuthorDate: Thu May 27 07:48:48 2021 +0200

    [FLINK-22770][sql-parser][planner-blink] Expose SET/RESET
    
    This adds the SET and RESET DCL statements directly into the parser and
    exposes them as their respective (pre-existing) operations.
    
    A major difference to the current SET/RESET supported in the SQL Client
    is that we now require quoting of both key and value for consistency with
    options elsewhere in Flink SQL. To avoid a breaking change, the SQL Client
    specific implementation is kept for now which takes precedence.
---
 .../src/main/codegen/data/Parser.tdd               |   4 +
 .../src/main/codegen/includes/parserImpls.ftl      |  45 +++++++++
 .../org/apache/flink/sql/parser/ddl/SqlReset.java  |  84 ++++++++++++++++
 .../org/apache/flink/sql/parser/ddl/SqlSet.java    | 107 +++++++++++++++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  13 +++
 .../operations/SqlToOperationConverter.java        |  22 +++++
 .../planner/parse/ResetOperationParseStrategy.java |   2 +-
 .../planner/parse/SetOperationParseStrategy.java   |   2 +-
 .../table/planner/calcite/FlinkPlannerImpl.scala   |   9 +-
 .../table/planner/delegation/ParserImplTest.java   |   3 +-
 .../operations/SqlToOperationConverterTest.java    |  27 ++++++
 .../parse/ResetOperationParseStrategyTest.java     |  39 ++++++++
 .../parse/SetOperationParseStrategyTest.java       |  40 ++++++++
 .../apache/flink/table/parse/ExtendedParser.java   |   2 +-
 14 files changed, 390 insertions(+), 9 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 5ad878b..96d4c27 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -44,6 +44,8 @@
     "org.apache.flink.sql.parser.ddl.SqlDropFunction"
     "org.apache.flink.sql.parser.ddl.SqlDropTable"
     "org.apache.flink.sql.parser.ddl.SqlDropView"
+    "org.apache.flink.sql.parser.ddl.SqlSet"
+    "org.apache.flink.sql.parser.ddl.SqlReset"
     "org.apache.flink.sql.parser.ddl.SqlTableColumn"
     "org.apache.flink.sql.parser.ddl.SqlTableLike"
     "org.apache.flink.sql.parser.ddl.SqlTableLike.FeatureOption"
@@ -485,6 +487,8 @@
     "SqlUnloadModule()"
     "SqlUseModules()"
     "SqlRichExplain()"
+    "SqlSet()"
+    "SqlReset()"
   ]
 
   # List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index b4cfc34..e9f32a9 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1590,3 +1590,48 @@ SqlNode SqlRichExplain() :
         return new SqlRichExplain(getPos(),stmt);
     }
 }
+
+/*
+* Parses a SET statement:
+* SET ['key' = 'value'];
+*/
+SqlNode SqlSet() :
+{
+    Span s;
+    SqlNode key = null;
+    SqlNode value = null;
+}
+{
+    <SET> { s = span(); }
+    [
+        key = StringLiteral()
+        <EQ>
+        value = StringLiteral()
+    ]
+    {
+        if (key == null && value == null) {
+            return new SqlSet(s.end(this));
+        } else {
+            return new SqlSet(s.end(this), key, value);
+        }
+    }
+}
+
+/**
+* Parses a RESET statement:
+* RESET ['key'];
+*/
+SqlNode SqlReset() :
+{
+    Span span;
+    SqlNode key = null;
+}
+{
+    <RESET> { span = span(); }
+    [
+        key = StringLiteral()
+    ]
+    {
+        return new SqlReset(span.end(this), key);
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReset.java
new file mode 100644
index 0000000..7ce1c16
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReset.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** SQL call for "RESET" and "RESET 'key'". */
+@Internal
+public class SqlReset extends SqlCall {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("RESET", SqlKind.OTHER);
+
+    @Nullable private final SqlNode key;
+
+    public SqlReset(SqlParserPos pos, @Nullable SqlNode key) {
+        super(pos);
+        this.key = key;
+    }
+
+    @Override
+    @Nonnull
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    @Nonnull
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(key);
+    }
+
+    public @Nullable SqlNode getKey() {
+        return key;
+    }
+
+    public @Nullable String getKeyString() {
+        if (key == null) {
+            return null;
+        }
+
+        return ((NlsString) SqlLiteral.value(key)).getValue();
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("RESET");
+        if (key != null) {
+            key.unparse(writer, leftPrec, rightPrec);
+        }
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlSet.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlSet.java
new file mode 100644
index 0000000..a808203
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlSet.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/** SQL call for "SET" and "SET 'key' = 'value'". */
+@Internal
+public class SqlSet extends SqlCall {
+
+    public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("SET", SqlKind.OTHER);
+
+    @Nullable private final SqlNode key;
+    @Nullable private final SqlNode value;
+
+    public SqlSet(SqlParserPos pos, SqlNode key, SqlNode value) {
+        super(pos);
+        this.key = Objects.requireNonNull(key, "key cannot be null");
+        this.value = Objects.requireNonNull(value, "value cannot be null");
+    }
+
+    public SqlSet(SqlParserPos pos) {
+        super(pos);
+        this.key = null;
+        this.value = null;
+    }
+
+    @Override
+    @Nonnull
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    @Nonnull
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(key, value);
+    }
+
+    public @Nullable SqlNode getKey() {
+        return key;
+    }
+
+    public @Nullable SqlNode getValue() {
+        return value;
+    }
+
+    public @Nullable String getKeyString() {
+        if (key == null) {
+            return null;
+        }
+
+        return ((NlsString) SqlLiteral.value(key)).getValue();
+    }
+
+    public @Nullable String getValueString() {
+        if (value == null) {
+            return null;
+        }
+
+        return ((NlsString) SqlLiteral.value(value)).getValue();
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("SET");
+
+        if (key != null && value != null) {
+            key.unparse(writer, leftPrec, rightPrec);
+            writer.keyword("=");
+            value.unparse(writer, leftPrec, rightPrec);
+        }
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index d9f523b..194b0c2 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1298,6 +1298,11 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
     }
 
     @Test
+    public void testSqlOptions() {
+        // SET/RESET are overridden for Flink SQL
+    }
+
+    @Test
     public void testExplainAsJson() {
         // TODO: FLINK-20562
     }
@@ -1315,6 +1320,14 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
         this.sql(sql).ok(expected);
     }
 
+    @Test
+    public void testSetReset() {
+        sql("SET").ok("SET");
+        sql("SET 'test-key' = 'test-value'").ok("SET 'test-key' = 'test-value'");
+        sql("RESET").ok("RESET");
+        sql("RESET 'test-key'").ok("RESET 'test-key'");
+    }
+
     public static BaseMatcher<SqlNode> validated(String validatedSql) {
         return new TypeSafeDiagnosingMatcher<SqlNode>() {
             @Override
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 7111a9e..8d78a63 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -43,6 +43,8 @@ import org.apache.flink.sql.parser.ddl.SqlDropFunction;
 import org.apache.flink.sql.parser.ddl.SqlDropPartitions;
 import org.apache.flink.sql.parser.ddl.SqlDropTable;
 import org.apache.flink.sql.parser.ddl.SqlDropView;
+import org.apache.flink.sql.parser.ddl.SqlReset;
+import org.apache.flink.sql.parser.ddl.SqlSet;
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
 import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
@@ -107,6 +109,8 @@ import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
 import org.apache.flink.table.operations.UseModulesOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
 import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
@@ -271,6 +275,10 @@ public class SqlToOperationConverter {
                     converter.convertBeginStatementSet((SqlBeginStatementSet) validated));
         } else if (validated instanceof SqlEndStatementSet) {
             return Optional.of(converter.convertEndStatementSet((SqlEndStatementSet) validated));
+        } else if (validated instanceof SqlSet) {
+            return Optional.of(converter.convertSet((SqlSet) validated));
+        } else if (validated instanceof SqlReset) {
+            return Optional.of(converter.convertReset((SqlReset) validated));
         } else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
             return Optional.of(converter.convertSqlQuery(validated));
         } else {
@@ -931,6 +939,20 @@ public class SqlToOperationConverter {
         return new ShowModulesOperation(sqlShowModules.requireFull());
     }
 
+    /** Convert SET ['key' = 'value']. */
+    private Operation convertSet(SqlSet sqlSet) {
+        if (sqlSet.getKey() == null && sqlSet.getValue() == null) {
+            return new SetOperation();
+        } else {
+            return new SetOperation(sqlSet.getKeyString(), sqlSet.getValueString());
+        }
+    }
+
+    /** Convert RESET ['key']. */
+    private Operation convertReset(SqlReset sqlReset) {
+        return new ResetOperation(sqlReset.getKeyString());
+    }
+
     /** Fallback method for sql query. */
     private Operation convertSqlQuery(SqlNode node) {
         return toQueryOperation(flinkPlanner, node);
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategy.java
index 708834d..fcd0bea 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategy.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategy.java
@@ -31,7 +31,7 @@ public class ResetOperationParseStrategy extends AbstractRegexParseStrategy {
     static final ResetOperationParseStrategy INSTANCE = new ResetOperationParseStrategy();
 
     private ResetOperationParseStrategy() {
-        super(Pattern.compile("RESET(\\s+(?<key>\\S+)\\s*)?", DEFAULT_PATTERN_FLAGS));
+        super(Pattern.compile("RESET(\\s+(?<key>[^'\\s]+)\\s*)?", DEFAULT_PATTERN_FLAGS));
     }
 
     @Override
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/SetOperationParseStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/SetOperationParseStrategy.java
index a7758a3..61b0453 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/SetOperationParseStrategy.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/parse/SetOperationParseStrategy.java
@@ -35,7 +35,7 @@ public class SetOperationParseStrategy extends AbstractRegexParseStrategy {
     protected SetOperationParseStrategy() {
         super(
                 Pattern.compile(
-                        "SET(\\s+(?<key>\\S+)\\s*=\\s*('(?<quotedVal>[^']*)'|(?<val>\\S+)))?",
+                        "SET(\\s+(?<key>[^'\\s]+)\\s*=\\s*('(?<quotedVal>[^']*)'|(?<val>\\S+)))?",
                         DEFAULT_PATTERN_FLAGS));
     }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index e036e17..ad770c9 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -23,7 +23,6 @@ import org.apache.flink.sql.parser.dml.{SqlBeginStatementSet, SqlEndStatementSet
 import org.apache.flink.sql.parser.dql._
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader
-
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.config.NullCollation
 import org.apache.calcite.plan._
@@ -37,15 +36,13 @@ import org.apache.calcite.sql.validate.SqlValidator
 import org.apache.calcite.sql.{SqlExplain, SqlKind, SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
 import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
-import org.apache.flink.sql.parser.ddl.SqlUseModules
+import org.apache.flink.sql.parser.ddl.{SqlReset, SqlSet, SqlUseModules}
 import org.apache.flink.table.planner.parse.CalciteParser
 
 import javax.annotation.Nullable
-
 import java.lang.{Boolean => JBoolean}
 import java.util
 import java.util.function.{Function => JFunction}
-
 import scala.collection.JavaConverters._
 
 /**
@@ -140,7 +137,9 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlUnloadModule]
         || sqlNode.isInstanceOf[SqlUseModules]
         || sqlNode.isInstanceOf[SqlBeginStatementSet]
-        || sqlNode.isInstanceOf[SqlEndStatementSet]) {
+        || sqlNode.isInstanceOf[SqlEndStatementSet]
+        || sqlNode.isInstanceOf[SqlSet]
+        || sqlNode.isInstanceOf[SqlReset]) {
         return sqlNode
       }
       sqlNode match {
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
index 1c24173..f8ea28b 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java
@@ -99,7 +99,8 @@ public class ParserImplTest {
                     forStatement("SET pipeline.name = ' '").summary("SET pipeline.name= "),
                     forStatement("SET execution.runtime-type=")
                             // TODO: the exception message should be "no value defined"
-                            .error("SQL parse failed. Encountered \"-\" at line 1, column 22"));
+                            .error(
+                                    "SQL parse failed. Encountered \"execution\" at line 1, column 5"));
 
     @Test
     public void testParseLegalStatements() {
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index fe16f55..5fd4dbd 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -58,6 +58,8 @@ import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.UseCatalogOperation;
 import org.apache.flink.table.operations.UseDatabaseOperation;
 import org.apache.flink.table.operations.UseModulesOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
 import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
@@ -1411,6 +1413,31 @@ public class SqlToOperationConverterTest {
         assertEquals("END", endStatementSetOperation.asSummaryString());
     }
 
+    @Test
+    public void testSet() {
+        Operation operation1 = parse("SET", SqlDialect.DEFAULT);
+        assertTrue(operation1 instanceof SetOperation);
+        assertFalse(((SetOperation) operation1).getKey().isPresent());
+        assertFalse(((SetOperation) operation1).getValue().isPresent());
+
+        Operation operation2 = parse("SET 'test-key' = 'test-value'", SqlDialect.DEFAULT);
+        assertTrue(operation2 instanceof SetOperation);
+        assertEquals("test-key", ((SetOperation) operation2).getKey().get());
+        assertEquals("test-value", ((SetOperation) operation2).getValue().get());
+    }
+
+    @Test
+    public void testReset() {
+        Operation operation1 = parse("RESET", SqlDialect.DEFAULT);
+        assertTrue(operation1 instanceof ResetOperation);
+        assertFalse(((ResetOperation) operation1).getKey().isPresent());
+
+        Operation operation2 = parse("RESET 'test-key'", SqlDialect.DEFAULT);
+        assertTrue(operation2 instanceof ResetOperation);
+        assertTrue(((ResetOperation) operation2).getKey().isPresent());
+        assertEquals("test-key", ((ResetOperation) operation2).getKey().get());
+    }
+
     // ~ Tool Methods ----------------------------------------------------------
 
     private static TestItem createTestItem(Object... args) {
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategyTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategyTest.java
new file mode 100644
index 0000000..6bd848b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/parse/ResetOperationParseStrategyTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.parse;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ResetOperationParseStrategy}. */
+public class ResetOperationParseStrategyTest {
+
+    @Test
+    public void testMatches() {
+        assertTrue(ResetOperationParseStrategy.INSTANCE.match("RESET"));
+        assertTrue(ResetOperationParseStrategy.INSTANCE.match("RESET table.planner"));
+    }
+
+    @Test
+    public void testDoesNotMatchQuotedKey() {
+        assertFalse(ResetOperationParseStrategy.INSTANCE.match("RESET 'table.planner'"));
+    }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/parse/SetOperationParseStrategyTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/parse/SetOperationParseStrategyTest.java
new file mode 100644
index 0000000..9703f97
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/parse/SetOperationParseStrategyTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.parse;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link SetOperationParseStrategy}. */
+public class SetOperationParseStrategyTest {
+
+    @Test
+    public void testMatches() {
+        assertTrue(SetOperationParseStrategy.INSTANCE.match("SET"));
+        assertTrue(SetOperationParseStrategy.INSTANCE.match("SET table.planner = blink"));
+        assertTrue(SetOperationParseStrategy.INSTANCE.match("SET table.planner = 'blink'"));
+    }
+
+    @Test
+    public void testDoesNotMatchQuotedKey() {
+        assertFalse(SetOperationParseStrategy.INSTANCE.match("SET 'table.planner' = blink"));
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ExtendedParser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ExtendedParser.java
index a1c98f1..57d9f95 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ExtendedParser.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/parse/ExtendedParser.java
@@ -28,7 +28,7 @@ import java.util.Optional;
 /**
  * {@link ExtendedParser} is used for parsing some special command which can't supported by {@link
  * CalciteParser}, e.g. {@code SET key=value} contains special characters in key and value
- * identifier. It's also good to move some parsring here to avoid introducing new reserved keywords.
+ * identifier. It's also good to move some parsing here to avoid introducing new reserved keywords.
  */
 public class ExtendedParser {