You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/06 12:08:58 UTC

[flink] branch release-1.9 updated: [FLINK-13540][table-api] Fix DDL parser doesn't support number or minus character in properties key

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new cf07492  [FLINK-13540][table-api] Fix DDL parser doesn't support number or minus character in properties key
cf07492 is described below

commit cf07492f6ba9f244cf3f98e4e41d9cfab58951cd
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Mon Aug 5 16:23:45 2019 +0800

    [FLINK-13540][table-api] Fix DDL parser doesn't support number or minus character in properties key
    
    In this commit, we parse DDL table properties key as string literals instead of identifiers, so that all the properties key should be quoted by single-quote.
    
    This closes #9361
---
 flink-python/pyflink/table/table_environment.py    |  22 +--
 .../src/main/codegen/data/Parser.tdd               |   1 +
 .../src/main/codegen/includes/parserImpls.ftl      |  12 +-
 .../flink/sql/parser/ddl/SqlCreateTable.java       |   2 +-
 .../flink/sql/parser/ddl/SqlTableOption.java       |  85 ++++++++++++
 .../flink/sql/parser/FlinkDDLDataTypeTest.java     |   2 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 148 +++++++++++++--------
 .../apache/flink/table/api/TableEnvironment.java   |  22 +--
 .../operations/SqlToOperationConverter.java        |  10 +-
 .../table/sqlexec/SqlToOperationConverterTest.java |  41 +++++-
 .../table/planner/catalog/CatalogTableITCase.scala |  44 +++---
 .../table/sqlexec/SqlToOperationConverter.java     |  10 +-
 .../table/sqlexec/SqlToOperationConverterTest.java |  40 +++++-
 .../flink/table/catalog/CatalogTableITCase.scala   |  44 +++---
 14 files changed, 336 insertions(+), 147 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index b7ad5b5..85e31c1 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -303,9 +303,9 @@ class TableEnvironment(object):
                 b bigint,
                 c varchar
             ) with (
-                connector.type = 'filesystem',
-                format.type = 'csv',
-                connector.path = 'xxx'
+                'connector.type' = 'filesystem',
+                'format.type' = 'csv',
+                'connector.path' = 'xxx'
             )
 
         SQL queries can directly execute as follows:
@@ -317,11 +317,11 @@ class TableEnvironment(object):
             ...     a int,
             ...     b varchar
             ... ) with (
-            ...     connector.type = 'kafka',
-            ...     `update-mode` = 'append',
-            ...     connector.topic = 'xxx',
-            ...     connector.properties.0.key = 'k0',
-            ...     connector.properties.0.value = 'v0'
+            ...     'connector.type' = 'kafka',
+            ...     'update-mode' = 'append',
+            ...     'connector.topic' = 'xxx',
+            ...     'connector.properties.0.key' = 'k0',
+            ...     'connector.properties.0.value' = 'v0'
             ... )
             ... '''
 
@@ -331,9 +331,9 @@ class TableEnvironment(object):
             ...     a int,
             ...     b varchar
             ... ) with (
-            ...     connector.type = 'filesystem',
-            ...     format.type = 'csv',
-            ...     connector.path = 'xxx'
+            ...     'connector.type' = 'filesystem',
+            ...     'format.type' = 'csv',
+            ...     'connector.path' = 'xxx'
             ... )
             ... '''
 
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 5cefc93..6207a89 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -28,6 +28,7 @@
     "org.apache.flink.sql.parser.ddl.SqlCreateView",
     "org.apache.flink.sql.parser.ddl.SqlDropView",
     "org.apache.flink.sql.parser.ddl.SqlTableColumn",
+    "org.apache.flink.sql.parser.ddl.SqlTableOption",
     "org.apache.flink.sql.parser.dml.RichSqlInsert",
     "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword",
     "org.apache.flink.sql.parser.type.SqlArrayType",
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index ae66846..fe2e3f3 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -115,18 +115,18 @@ void UniqueKey(List<SqlNodeList> list) :
     }
 }
 
-SqlNode PropertyValue() :
+SqlNode TableOption() :
 {
-    SqlIdentifier key;
+    SqlNode key;
     SqlNode value;
     SqlParserPos pos;
 }
 {
-    key = CompoundIdentifier()
+    key = StringLiteral()
     { pos = getPos(); }
     <EQ> value = StringLiteral()
     {
-        return new SqlProperty(key, value, getPos());
+        return new SqlTableOption(key, value, getPos());
     }
 }
 
@@ -140,12 +140,12 @@ SqlNodeList TableProperties():
 {
     <LPAREN> { span = span(); }
     [
-        property = PropertyValue()
+        property = TableOption()
         {
             proList.add(property);
         }
         (
-            <COMMA> property = PropertyValue()
+            <COMMA> property = TableOption()
             {
                 proList.add(property);
             }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index ec9e562..980aec4 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -212,7 +212,7 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 	 *     col2 varchar,
 	 *     col3 as to_timestamp(col2)
 	 *   ) with (
-	 *     connector = 'csv'
+	 *     'connector' = 'csv'
 	 *   )
 	 * </pre>
 	 * we would return a query like:
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableOption.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableOption.java
new file mode 100644
index 0000000..0564783
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableOption.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Table options of a DDL, a key-value pair
+ * with both key and value as string literal.
+ */
+public class SqlTableOption extends SqlCall {
+	/** Use this operator only if you don't have a better one. */
+	protected static final SqlOperator OPERATOR =
+		new SqlSpecialOperator("TableOption", SqlKind.OTHER);
+
+	private final SqlNode key;
+	private final SqlNode value;
+
+	public SqlTableOption(SqlNode key, SqlNode value, SqlParserPos pos) {
+		super(pos);
+		this.key = requireNonNull(key, "Option key is missing");
+		this.value = requireNonNull(value, "Option value is missing");
+	}
+
+	public SqlNode getKey() {
+		return key;
+	}
+
+	public SqlNode getValue() {
+		return value;
+	}
+
+	public String getKeyString() {
+		return ((NlsString) SqlLiteral.value(key)).getValue();
+	}
+
+	public String getValueString() {
+		return ((NlsString) SqlLiteral.value(value)).getValue();
+	}
+
+	@Override
+	public SqlOperator getOperator() {
+		return OPERATOR;
+	}
+
+	@Override
+	public List<SqlNode> getOperandList() {
+		return ImmutableNullableList.of(key, value);
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		key.unparse(writer, leftPrec, rightPrec);
+		writer.keyword("=");
+		value.unparse(writer, leftPrec, rightPrec);
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
index 4b4499f..410e30d 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
@@ -67,7 +67,7 @@ public class FlinkDDLDataTypeTest {
 	private static final String DDL_FORMAT = "create table t1 (\n" +
 		"  f0 %s\n" +
 		") with (\n" +
-		"  k1 = 'v1'\n" +
+		"  'k1' = 'v1'\n" +
 		")";
 
 	@Parameterized.Parameters(name = "{index}: {0}")
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index cc892ab..97bb093 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -85,8 +85,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				")\n" +
 				"PARTITIONED BY (a, h)\n" +
 				"  with (\n" +
-				"    connector = 'kafka', \n" +
-				"    kafka.topic = 'log.test'\n" +
+				"    'connector' = 'kafka', \n" +
+				"    'kafka.topic' = 'log.test'\n" +
 				")\n",
 			"CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT,\n" +
@@ -99,8 +99,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				")\n" +
 				"PARTITIONED BY (`A`, `H`)\n" +
 				"WITH (\n" +
-				"  `CONNECTOR` = 'kafka',\n" +
-				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				"  'connector' = 'kafka',\n" +
+				"  'kafka.topic' = 'log.test'\n" +
 				")");
 	}
 
@@ -118,8 +118,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"comment 'test table comment ABC.'\n" +
 				"PARTITIONED BY (a, h)\n" +
 				"  with (\n" +
-				"    connector = 'kafka', \n" +
-				"    kafka.topic = 'log.test'\n" +
+				"    'connector' = 'kafka', \n" +
+				"    'kafka.topic' = 'log.test'\n" +
 				")\n",
 			"CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT  COMMENT 'test column comment AAA.',\n" +
@@ -133,8 +133,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"COMMENT 'test table comment ABC.'\n" +
 				"PARTITIONED BY (`A`, `H`)\n" +
 				"WITH (\n" +
-				"  `CONNECTOR` = 'kafka',\n" +
-				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				"  'connector' = 'kafka',\n" +
+				"  'kafka.topic' = 'log.test'\n" +
 				")");
 	}
 
@@ -153,8 +153,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"comment 'test table comment ABC.'\n" +
 				"PARTITIONED BY (a, h)\n" +
 				"  with (\n" +
-				"    connector = 'kafka', \n" +
-				"    kafka.topic = 'log.test'\n" +
+				"    'connector' = 'kafka', \n" +
+				"    'kafka.topic' = 'log.test'\n" +
 				")\n",
 			"CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT  COMMENT 'test column comment AAA.',\n" +
@@ -169,8 +169,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"COMMENT 'test table comment ABC.'\n" +
 				"PARTITIONED BY (`A`, `H`)\n" +
 				"WITH (\n" +
-				"  `CONNECTOR` = 'kafka',\n" +
-				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				"  'connector' = 'kafka',\n" +
+				"  'kafka.topic' = 'log.test'\n" +
 				")");
 	}
 
@@ -184,8 +184,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  WATERMARK FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
 				")\n" +
 				"  with (\n" +
-				"    connector = 'kafka', \n" +
-				"    kafka.topic = 'log.test'\n" +
+				"    'connector' = 'kafka', \n" +
+				"    'kafka.topic' = 'log.test'\n" +
 				")\n",
 			"CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT,\n" +
@@ -193,8 +193,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  `C` AS (2 * (`A` + 1)),\n" +
 				"  WATERMARK FOR `A` AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
 				") WITH (\n" +
-				"  `CONNECTOR` = 'kafka',\n" +
-				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				"  'connector' = 'kafka',\n" +
+				"  'kafka.topic' = 'log.test'\n" +
 				")");
 	}
 
@@ -208,8 +208,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 DAY\n" +
 				")\n" +
 				"  with (\n" +
-				"    connector = 'kafka', \n" +
-				"    kafka.topic = 'log.test'\n" +
+				"    'connector' = 'kafka', \n" +
+				"    'kafka.topic' = 'log.test'\n" +
 				")\n",
 			"CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT,\n" +
@@ -217,8 +217,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  `C` AS (2 * (`A` + 1)),\n" +
 				"  WATERMARK `WK` FOR `A` AS BOUNDED WITH DELAY 1000 DAY\n" +
 				") WITH (\n" +
-				"  `CONNECTOR` = 'kafka',\n" +
-				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				"  'connector' = 'kafka',\n" +
+				"  'kafka.topic' = 'log.test'\n" +
 				")");
 	}
 
@@ -232,8 +232,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 HOUR\n" +
 				")\n" +
 				"  with (\n" +
-				"    connector = 'kafka', \n" +
-				"    kafka.topic = 'log.test'\n" +
+				"    'connector' = 'kafka', \n" +
+				"    'kafka.topic' = 'log.test'\n" +
 				")\n",
 			"CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT,\n" +
@@ -241,8 +241,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  `C` AS (2 * (`A` + 1)),\n" +
 				"  WATERMARK `WK` FOR `A` AS BOUNDED WITH DELAY 1000 HOUR\n" +
 				") WITH (\n" +
-				"  `CONNECTOR` = 'kafka',\n" +
-				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				"  'connector' = 'kafka',\n" +
+				"  'kafka.topic' = 'log.test'\n" +
 				")");
 	}
 
@@ -256,8 +256,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 MINUTE\n" +
 				")\n" +
 				"  with (\n" +
-				"    connector = 'kafka', \n" +
-				"    kafka.topic = 'log.test'\n" +
+				"    'connector' = 'kafka', \n" +
+				"    'kafka.topic' = 'log.test'\n" +
 				")\n",
 			"CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT,\n" +
@@ -265,8 +265,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  `C` AS (2 * (`A` + 1)),\n" +
 				"  WATERMARK `WK` FOR `A` AS BOUNDED WITH DELAY 1000 MINUTE\n" +
 				") WITH (\n" +
-				"  `CONNECTOR` = 'kafka',\n" +
-				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				"  'connector' = 'kafka',\n" +
+				"  'kafka.topic' = 'log.test'\n" +
 				")");
 	}
 
@@ -280,8 +280,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 SECOND\n" +
 				")\n" +
 				"  with (\n" +
-				"    connector = 'kafka', \n" +
-				"    kafka.topic = 'log.test'\n" +
+				"    'connector' = 'kafka', \n" +
+				"    'kafka.topic' = 'log.test'\n" +
 				")\n",
 			"CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT,\n" +
@@ -289,8 +289,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  `C` AS (2 * (`A` + 1)),\n" +
 				"  WATERMARK `WK` FOR `A` AS BOUNDED WITH DELAY 1000 SECOND\n" +
 				") WITH (\n" +
-				"  `CONNECTOR` = 'kafka',\n" +
-				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				"  'connector' = 'kafka',\n" +
+				"  'kafka.topic' = 'log.test'\n" +
 				")");
 	}
 
@@ -304,8 +304,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY ^-^1000 SECOND\n" +
 				")\n" +
 				"  with (\n" +
-				"    connector = 'kafka', \n" +
-				"    kafka.topic = 'log.test'\n" +
+				"    'connector' = 'kafka', \n" +
+				"    'kafka.topic' = 'log.test'\n" +
 				")\n",
 			"(?s).*Encountered \"-\" at line 5, column 44.\n" +
 				"Was expecting:\n" +
@@ -323,8 +323,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  WATERMARK wk FOR a AS ASCENDING\n" +
 				")\n" +
 				"  with (\n" +
-				"    connector = 'kafka', \n" +
-				"    kafka.topic = 'log.test'\n" +
+				"    'connector' = 'kafka', \n" +
+				"    'kafka.topic' = 'log.test'\n" +
 				")\n",
 			"CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT,\n" +
@@ -332,8 +332,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  `C` AS (2 * (`A` + 1)),\n" +
 				"  WATERMARK `WK` FOR `A` AS ASCENDING\n" +
 				") WITH (\n" +
-				"  `CONNECTOR` = 'kafka',\n" +
-				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				"  'connector' = 'kafka',\n" +
+				"  'kafka.topic' = 'log.test'\n" +
 				")");
 	}
 
@@ -347,8 +347,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  WATERMARK wk FOR a AS FROM_SOURCE\n" +
 				")\n" +
 				"  with (\n" +
-				"    connector = 'kafka', \n" +
-				"    kafka.topic = 'log.test'\n" +
+				"    'connector' = 'kafka', \n" +
+				"    'kafka.topic' = 'log.test'\n" +
 				")\n",
 			"CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT,\n" +
@@ -356,8 +356,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  `C` AS (2 * (`A` + 1)),\n" +
 				"  WATERMARK `WK` FOR `A` AS FROM_SOURCE\n" +
 				") WITH (\n" +
-				"  `CONNECTOR` = 'kafka',\n" +
-				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				"  'connector' = 'kafka',\n" +
+				"  'kafka.topic' = 'log.test'\n" +
 				")");
 	}
 
@@ -370,8 +370,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"  d MULTISET<varchar>,\n" +
 			"  PRIMARY KEY (a, b) \n" +
 			") with (\n" +
-			"  x = 'y', \n" +
-			"  asd = 'data'\n" +
+			"  'x' = 'y', \n" +
+			"  'asd' = 'data'\n" +
 			")\n", "CREATE TABLE `TBL1` (\n" +
 			"  `A`  ARRAY< BIGINT >,\n" +
 			"  `B`  MAP< INTEGER, VARCHAR >,\n" +
@@ -379,8 +379,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"  `D`  MULTISET< VARCHAR >,\n" +
 			"  PRIMARY KEY (`A`, `B`)\n" +
 			") WITH (\n" +
-			"  `X` = 'y',\n" +
-			"  `ASD` = 'data'\n" +
+			"  'x' = 'y',\n" +
+			"  'asd' = 'data'\n" +
 			")");
 	}
 
@@ -393,8 +393,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"  d MULTISET<ARRAY<int>>,\n" +
 			"  PRIMARY KEY (a, b) \n" +
 			") with (\n" +
-			"  x = 'y', \n" +
-			"  asd = 'data'\n" +
+			"  'x' = 'y', \n" +
+			"  'asd' = 'data'\n" +
 			")\n", "CREATE TABLE `TBL1` (\n" +
 			"  `A`  ARRAY< ARRAY< BIGINT > >,\n" +
 			"  `B`  MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > >,\n" +
@@ -402,8 +402,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"  `D`  MULTISET< ARRAY< INTEGER > >,\n" +
 			"  PRIMARY KEY (`A`, `B`)\n" +
 			") WITH (\n" +
-			"  `X` = 'y',\n" +
-			"  `ASD` = 'data'\n" +
+			"  'x' = 'y',\n" +
+			"  'asd' = 'data'\n" +
 			")");
 	}
 
@@ -415,8 +415,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"  toTimestamp^(^b, 'yyyy-MM-dd HH:mm:ss'), \n" +
 			"  PRIMARY KEY (a, b) \n" +
 			") with (\n" +
-			"  x = 'y', \n" +
-			"  asd = 'data'\n" +
+			"  'x' = 'y', \n" +
+			"  'asd' = 'data'\n" +
 			")\n", "(?s).*Encountered \"\\(\" at line 4, column 14.\n" +
 			"Was expecting one of:\n" +
 			"    \"AS\" ...\n" +
@@ -435,8 +435,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"  c int,\n" +
 			"  PRIMARY KEY (a, b) \n" +
 			") with (\n" +
-			"  x = 'y', \n" +
-			"  asd = 'data'\n" +
+			"  'x' = 'y', \n" +
+			"  'asd' = 'data'\n" +
 			")\n";
 		String expected = "`A`, (`A` + 1) AS `F`, `B`, "
 			+ "`TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss') AS `TS`, "
@@ -454,13 +454,51 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			") PARTITIONED BY (\n" +
 			"  c,\n" +
 			"  d\n" +
-			") with ( x = 'y', asd = 'dada')";
+			") with ( 'x' = 'y', 'asd' = 'dada')";
 		sql(sql).node(new ValidationMatcher()
 			.fails("Partition column [C] not defined in columns, at line 6, column 3"));
 
 	}
 
 	@Test
+	public void testCreateTableWithMinusInOptionKey() {
+		String sql = "create table source_table(\n" +
+			"  a int,\n" +
+			"  b bigint,\n" +
+			"  c string\n" +
+			") with (\n" +
+			"  'a-b-c-d124' = 'ab',\n" +
+			"  'a.b.1.c' = 'aabb',\n" +
+			"  'a.b-c-connector.e-f.g' = 'ada',\n" +
+			"  'a.b-c-d.e-1231.g' = 'ada',\n" +
+			"  'a.b-c-d.*' = 'adad')\n";
+		String expected = "CREATE TABLE `SOURCE_TABLE` (\n" +
+			"  `A`  INTEGER,\n" +
+			"  `B`  BIGINT,\n" +
+			"  `C`  STRING\n" +
+			") WITH (\n" +
+			"  'a-b-c-d124' = 'ab',\n" +
+			"  'a.b.1.c' = 'aabb',\n" +
+			"  'a.b-c-connector.e-f.g' = 'ada',\n" +
+			"  'a.b-c-d.e-1231.g' = 'ada',\n" +
+			"  'a.b-c-d.*' = 'adad'\n" +
+			")";
+		check(sql, expected);
+	}
+
+	@Test
+	public void testCreateTableWithOptionKeyAsIdentifier() {
+		String sql = "create table source_table(\n" +
+			"  a int,\n" +
+			"  b bigint,\n" +
+			"  c string\n" +
+			") with (\n" +
+			"  ^a^.b.c = 'ab',\n" +
+			"  a.b.c1 = 'aabb')\n";
+		sql(sql).fails("(?s).*Encountered \"a\" at line 6, column 3.\n.*");
+	}
+
+	@Test
 	public void testDropTable() {
 		String sql = "DROP table catalog1.db1.tbl1";
 		check(sql, "DROP TABLE `CATALOG1`.`DB1`.`TBL1`");
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index d7e9d7e..8f8364f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -372,9 +372,9 @@ public interface TableEnvironment {
 	 *      b bigint,
 	 *      c varchar
 	 *    ) with (
-	 *      connector.type = 'filesystem',
-	 *      format.type = 'csv',
-	 *      connector.path = 'xxx'
+	 *      'connector.type' = 'filesystem',
+	 *      'format.type' = 'csv',
+	 *      'connector.path' = 'xxx'
 	 *    )
 	 * </pre></blockquote>
 	 *
@@ -385,20 +385,20 @@ public interface TableEnvironment {
 	 *                        a int,
 	 *                        b varchar
 	 *                      ) with (
-	 *                        connector.type = 'filesystem',
-	 *                        format.type = 'csv',
-	 *                        connector.path = 'xxx'
+	 *                        'connector.type' = 'filesystem',
+	 *                        'format.type' = 'csv',
+	 *                        'connector.path' = 'xxx'
 	 *                      )";
 	 *
 	 *    String sourceDDL ="create table sourceTable(
 	 *                        a int,
 	 *                        b varchar
 	 *                      ) with (
-	 *                        connector.type = 'kafka',
-	 *                        `update-mode` = 'append',
-	 *                        connector.topic = 'xxx',
-	 *                        connector.properties.0.key = 'k0',
-	 *                        connector.properties.0.value = 'v0',
+	 *                        'connector.type' = 'kafka',
+	 *                        'update-mode' = 'append',
+	 *                        'connector.topic' = 'xxx',
+	 *                        'connector.properties.0.key' = 'k0',
+	 *                        'connector.properties.0.value' = 'v0',
 	 *                        ...
 	 *                      )";
 	 *
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 86df643..8e9cc80 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.table.planner.operations;
 
-import org.apache.flink.sql.parser.SqlProperty;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.sql.parser.ddl.SqlDropTable;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.sql.parser.dml.RichSqlInsert;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
@@ -112,8 +112,8 @@ public class SqlToOperationConverter {
 		Map<String, String> properties = new HashMap<>();
 		if (propertyList != null) {
 			propertyList.getList().forEach(p ->
-				properties.put(((SqlProperty) p).getKeyString().toLowerCase(),
-					((SqlProperty) p).getValueString()));
+				properties.put(((SqlTableOption) p).getKeyString().toLowerCase(),
+					((SqlTableOption) p).getValueString()));
 		}
 
 		TableSchema tableSchema = createTableSchema(sqlCreateTable,
@@ -172,8 +172,8 @@ public class SqlToOperationConverter {
 	 *     b varchar,
 	 *     c as to_timestamp(b))
 	 *   with (
-	 *     connector = 'csv',
-	 *     k1 = 'v1')
+	 *     'connector' = 'csv',
+	 *     'k1' = 'v1')
 	 * </pre></blockquote>
 	 *
 	 * <p>The returned table schema contains columns (a:int, b:varchar, c:timestamp).
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 9368d00..6fa3c31 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.sqlexec;
 
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableConfig;
@@ -51,6 +52,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
 import static org.junit.Assert.assertArrayEquals;
@@ -107,8 +110,8 @@ public class SqlToOperationConverterTest {
 			")\n" +
 			"  PARTITIONED BY (a, d)\n" +
 			"  with (\n" +
-			"    connector = 'kafka', \n" +
-			"    kafka.topic = 'log.test'\n" +
+			"    'connector' = 'kafka', \n" +
+			"    'kafka.topic' = 'log.test'\n" +
 			")\n";
 		FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
 		Operation operation = parse(sql, planner);
@@ -139,13 +142,43 @@ public class SqlToOperationConverterTest {
 			")\n" +
 			"  PARTITIONED BY (a, d)\n" +
 			"  with (\n" +
-			"    connector = 'kafka', \n" +
-			"    kafka.topic = 'log.test'\n" +
+			"    'connector' = 'kafka', \n" +
+			"    'kafka.topic' = 'log.test'\n" +
 			")\n";
 		parse(sql, planner);
 	}
 
 	@Test
+	public void testCreateTableWithMinusInOptionKey() {
+		final String sql = "create table source_table(\n" +
+			"  a int,\n" +
+			"  b bigint,\n" +
+			"  c varchar\n" +
+			") with (\n" +
+			"  'a-b-c-d124' = 'ab',\n" +
+			"  'a.b-c-d.e-f.g' = 'ada',\n" +
+			"  'a.b-c-d.e-f1231.g' = 'ada',\n" +
+			"  'a.b-c-d.*' = 'adad')\n";
+		final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+		SqlNode node = planner.parse(sql);
+		assert node instanceof SqlCreateTable;
+		Operation operation = SqlToOperationConverter.convert(planner, node);
+		assert operation instanceof CreateTableOperation;
+		CreateTableOperation op = (CreateTableOperation) operation;
+		CatalogTable catalogTable = op.getCatalogTable();
+		Map<String, String> properties = catalogTable.toProperties()
+			.entrySet().stream()
+			.filter(e -> !e.getKey().contains("schema"))
+			.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+		Map<String, String> sortedProperties = new TreeMap<>(properties);
+		final String expected = "{a-b-c-d124=ab, "
+			+ "a.b-c-d.*=adad, "
+			+ "a.b-c-d.e-f.g=ada, "
+			+ "a.b-c-d.e-f1231.g=ada}";
+		assertEquals(expected, sortedProperties.toString());
+	}
+
+	@Test
 	public void testSqlInsertWithStaticPartition() {
 		final String sql = "insert into t1 partition(a=1) select b, c, d from t2";
 		FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.HIVE);
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index 5b9665b..3996d6d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -101,7 +101,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  b varchar,
         |  c int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -111,7 +111,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  b varchar,
         |  c int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -137,7 +137,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  b varchar,
         |  c int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -147,7 +147,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  b varchar,
         |  c as a + 1
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -186,7 +186,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  b int,
         |  c int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -197,7 +197,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  c int,
         |  d int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -241,7 +241,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  b int,
         |  c int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -250,7 +250,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  a int,
         |  b int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -281,7 +281,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  c as proctime,
         |  primary key(a)
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -290,7 +290,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  a int,
         |  b int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -322,7 +322,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  primary key(a),
         |  WATERMARK wm FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -331,7 +331,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  a bigint,
         |  b bigint
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -363,7 +363,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  c as proctime,
         |  primary key(a)
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -372,7 +372,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  a int,
         |  b int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -404,7 +404,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  primary key(a),
         |  WATERMARK wm FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -413,7 +413,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  a bigint,
         |  b bigint
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -438,7 +438,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  b bigint,
         |  c varchar
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val ddl2 =
@@ -447,7 +447,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  a bigint,
         |  b bigint
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
 
@@ -467,7 +467,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  b bigint,
         |  c varchar
         |) with (
-        |  connector = 'COLLECTION'
+        | 'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val ddl2 =
@@ -476,7 +476,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  a bigint,
         |  b bigint
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
 
@@ -497,7 +497,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  b bigint,
         |  c varchar
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
 
@@ -516,7 +516,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) {
         |  b bigint,
         |  c varchar
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index e0636ef..1cefdf8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.table.sqlexec;
 
-import org.apache.flink.sql.parser.SqlProperty;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.sql.parser.ddl.SqlDropTable;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.sql.parser.dml.RichSqlInsert;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
@@ -113,8 +113,8 @@ public class SqlToOperationConverter {
 		Map<String, String> properties = new HashMap<>();
 		if (propertyList != null) {
 			propertyList.getList().forEach(p ->
-				properties.put(((SqlProperty) p).getKeyString().toLowerCase(),
-					((SqlProperty) p).getValueString()));
+				properties.put(((SqlTableOption) p).getKeyString().toLowerCase(),
+					((SqlTableOption) p).getValueString()));
 		}
 
 		TableSchema tableSchema = createTableSchema(sqlCreateTable,
@@ -173,8 +173,8 @@ public class SqlToOperationConverter {
 	 *     b varchar,
 	 *     c as to_timestamp(b))
 	 *   with (
-	 *     connector = 'csv',
-	 *     k1 = 'v1')
+	 *     'connector' = 'csv',
+	 *     'k1' = 'v1')
 	 * </pre></blockquote>
 	 *
 	 * <p>The returned table schema contains columns (a:int, b:varchar, c:timestamp).
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index fe3a405..7225cbc 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -52,6 +52,8 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
 import static org.junit.Assert.assertArrayEquals;
@@ -107,8 +109,8 @@ public class SqlToOperationConverterTest {
 			")\n" +
 			"  PARTITIONED BY (a, d)\n" +
 			"  with (\n" +
-			"    connector = 'kafka', \n" +
-			"    kafka.topic = 'log.test'\n" +
+			"    'connector' = 'kafka', \n" +
+			"    'kafka.topic' = 'log.test'\n" +
 			")\n";
 		final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
 		SqlNode node = planner.parse(sql);
@@ -128,6 +130,36 @@ public class SqlToOperationConverterTest {
 				DataTypes.VARCHAR(Integer.MAX_VALUE)});
 	}
 
+	@Test
+	public void testCreateTableWithMinusInOptionKey() {
+		final String sql = "create table source_table(\n" +
+			"  a int,\n" +
+			"  b bigint,\n" +
+			"  c varchar\n" +
+			") with (\n" +
+			"  'a-b-c-d124' = 'ab',\n" +
+			"  'a.b-c-d.e-f.g' = 'ada',\n" +
+			"  'a.b-c-d.e-f1231.g' = 'ada',\n" +
+			"  'a.b-c-d.*' = 'adad')\n";
+		final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+		SqlNode node = planner.parse(sql);
+		assert node instanceof SqlCreateTable;
+		Operation operation = SqlToOperationConverter.convert(planner, node);
+		assert operation instanceof CreateTableOperation;
+		CreateTableOperation op = (CreateTableOperation) operation;
+		CatalogTable catalogTable = op.getCatalogTable();
+		Map<String, String> properties = catalogTable.toProperties()
+			.entrySet().stream()
+			.filter(e -> !e.getKey().contains("schema"))
+			.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+		Map<String, String> sortedProperties = new TreeMap<>(properties);
+		final String expected = "{a-b-c-d124=ab, "
+			+ "a.b-c-d.*=adad, "
+			+ "a.b-c-d.e-f.g=ada, "
+			+ "a.b-c-d.e-f1231.g=ada}";
+		assertEquals(expected, sortedProperties.toString());
+	}
+
 	@Test(expected = SqlConversionException.class)
 	public void testCreateTableWithPkUniqueKeys() {
 		final String sql = "CREATE TABLE tbl1 (\n" +
@@ -140,8 +172,8 @@ public class SqlToOperationConverterTest {
 			")\n" +
 			"  PARTITIONED BY (a, d)\n" +
 			"  with (\n" +
-			"    connector = 'kafka', \n" +
-			"    kafka.topic = 'log.test'\n" +
+			"    'connector' = 'kafka', \n" +
+			"    'kafka.topic' = 'log.test'\n" +
 			")\n";
 		final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
 		SqlNode node = planner.parse(sql);
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index e5f8a76..46d098d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -113,7 +113,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  b varchar,
         |  c int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -123,7 +123,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  b varchar,
         |  c int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -149,7 +149,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  b varchar,
         |  c int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -159,7 +159,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  b varchar,
         |  c as a + 1
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -198,7 +198,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  b int,
         |  c int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -209,7 +209,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  c int,
         |  d int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -253,7 +253,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  b int,
         |  c int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -262,7 +262,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  a int,
         |  b int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -293,7 +293,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  c as proctime,
         |  primary key(a)
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -302,7 +302,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  a int,
         |  b int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -334,7 +334,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  primary key(a),
         |  WATERMARK wm FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -343,7 +343,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  a bigint,
         |  b bigint
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -375,7 +375,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  c as proctime,
         |  primary key(a)
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -384,7 +384,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  a int,
         |  b int
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -416,7 +416,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  primary key(a),
         |  WATERMARK wm FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val sinkDDL =
@@ -425,7 +425,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  a bigint,
         |  b bigint
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val query =
@@ -450,7 +450,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  b bigint,
         |  c varchar
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val ddl2 =
@@ -459,7 +459,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  a bigint,
         |  b bigint
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
 
@@ -479,7 +479,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  b bigint,
         |  c varchar
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
     val ddl2 =
@@ -488,7 +488,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  a bigint,
         |  b bigint
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
 
@@ -509,7 +509,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  b bigint,
         |  c varchar
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin
 
@@ -528,7 +528,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
         |  b bigint,
         |  c varchar
         |) with (
-        |  connector = 'COLLECTION'
+        |  'connector' = 'COLLECTION'
         |)
       """.stripMargin