You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/11 06:52:04 UTC

[flink] branch release-1.11 updated (a76a7cc -> 2d0b447)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from a76a7cc  [FLINK-18224][docs] Add document about sql client's tableau result mode
     new 820b662  [FLINK-18030][hive] Hive UDF doesn't accept empty string literal parameters
     new 2d0b447  [FLINK-17965][sql-parser-hive] Hive dialect needs to unescape backslash in string literals

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/catalog/hive/util/HiveTableUtil.java     |  4 ++
 .../table/catalog/hive/util/HiveTypeUtil.java      | 32 +++++++---
 .../flink/connectors/hive/HiveDialectITCase.java   |  1 +
 .../hive/TableEnvHiveConnectorITCase.java          |  2 +-
 .../catalog/hive/HiveCatalogDataTypeTest.java      |  2 -
 .../flink/table/module/hive/HiveModuleTest.java    | 17 +++++
 .../flink/sql/parser/hive/ddl/HiveDDLUtils.java    | 73 ++++++++++++++++++++++
 .../sql/parser/hive/ddl/SqlAddHivePartitions.java  |  3 +
 .../parser/hive/ddl/SqlAlterHiveDatabaseProps.java |  1 +
 .../hive/ddl/SqlAlterHivePartitionRename.java      |  2 +
 .../parser/hive/ddl/SqlAlterHiveTableProps.java    |  3 +-
 .../parser/hive/ddl/SqlAlterHiveTableSerDe.java    |  1 +
 .../hive/ddl/SqlAlterHiveViewProperties.java       |  1 +
 .../sql/parser/hive/ddl/SqlCreateHiveDatabase.java |  8 ++-
 .../sql/parser/hive/ddl/SqlCreateHiveTable.java    | 17 ++++-
 .../sql/parser/hive/ddl/SqlCreateHiveView.java     | 13 +++-
 .../sql/parser/hive/dml/RichSqlHiveInsert.java     |  2 +
 .../parser/hive/FlinkHiveSqlParserImplTest.java    | 12 +++-
 18 files changed, 173 insertions(+), 21 deletions(-)


[flink] 01/02: [FLINK-18030][hive] Hive UDF doesn't accept empty string literal parameters

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 820b662b94f8362ce829fc237fd413d033f492d1
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Jun 11 14:43:29 2020 +0800

    [FLINK-18030][hive] Hive UDF doesn't accept empty string literal parameters
    
    
    This closes #12403
---
 .../table/catalog/hive/util/HiveTypeUtil.java      | 32 +++++++++++++++-------
 .../catalog/hive/HiveCatalogDataTypeTest.java      |  2 --
 .../flink/table/module/hive/HiveModuleTest.java    | 17 ++++++++++++
 3 files changed, 39 insertions(+), 12 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
index 7e7ef81..0678c17 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
@@ -182,11 +182,17 @@ public class HiveTypeUtil {
 
 		@Override
 		public TypeInfo visit(CharType charType) {
-			if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH) {
-				throw new CatalogException(
-						String.format("HiveCatalog doesn't support char type with length of '%d'. " +
-									"The maximum length is %d",
-									charType.getLength(), HiveChar.MAX_CHAR_LENGTH));
+			// Flink and Hive have different length limit for CHAR. Promote it to STRING if it exceeds the limits of
+			// Hive and we're told not to check precision. This can be useful when calling Hive UDF to process data.
+			if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH || charType.getLength() < 1) {
+				if (checkPrecision) {
+					throw new CatalogException(
+							String.format("HiveCatalog doesn't support char type with length of '%d'. " +
+											"The supported length is [%d, %d]",
+									charType.getLength(), 1, HiveChar.MAX_CHAR_LENGTH));
+				} else {
+					return TypeInfoFactory.stringTypeInfo;
+				}
 			}
 			return TypeInfoFactory.getCharTypeInfo(charType.getLength());
 		}
@@ -199,11 +205,17 @@ public class HiveTypeUtil {
 			if (varCharType.getLength() == Integer.MAX_VALUE) {
 				return TypeInfoFactory.stringTypeInfo;
 			}
-			if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH) {
-				throw new CatalogException(
-						String.format("HiveCatalog doesn't support varchar type with length of '%d'. " +
-									"The maximum length is %d",
-									varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH));
+			// Flink and Hive have different length limit for VARCHAR. Promote it to STRING if it exceeds the limits of
+			// Hive and we're told not to check precision. This can be useful when calling Hive UDF to process data.
+			if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH || varCharType.getLength() < 1) {
+				if (checkPrecision) {
+					throw new CatalogException(
+							String.format("HiveCatalog doesn't support varchar type with length of '%d'. " +
+											"The supported length is [%d, %d]",
+									varCharType.getLength(), 1, HiveVarchar.MAX_VARCHAR_LENGTH));
+				} else {
+					return TypeInfoFactory.stringTypeInfo;
+				}
 			}
 			return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
 		}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
index 5208d53..d24380f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
@@ -154,7 +154,6 @@ public class HiveCatalogDataTypeTest {
 		};
 
 		exception.expect(CatalogException.class);
-		exception.expectMessage("HiveCatalog doesn't support char type with length of '256'. The maximum length is 255");
 		verifyDataTypes(types);
 	}
 
@@ -165,7 +164,6 @@ public class HiveCatalogDataTypeTest {
 		};
 
 		exception.expect(CatalogException.class);
-		exception.expectMessage("HiveCatalog doesn't support varchar type with length of '65536'. The maximum length is 65535");
 		verifyDataTypes(types);
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
index 374984a..343d818 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
@@ -168,4 +168,21 @@ public class HiveModuleTest {
 
 		assertEquals("[{a=1, b=2, c=3}]", results.toString());
 	}
+
+	@Test
+	public void testEmptyStringLiteralParameters() {
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+
+		tableEnv.unloadModule("core");
+		tableEnv.loadModule("hive", new HiveModule());
+
+		// UDF
+		List<Row> results = Lists.newArrayList(
+				tableEnv.sqlQuery("select regexp_replace('foobar','oo|ar','')").execute().collect());
+		assertEquals("[fb]", results.toString());
+
+		// GenericUDF
+		results = Lists.newArrayList(tableEnv.sqlQuery("select length('')").execute().collect());
+		assertEquals("[0]", results.toString());
+	}
 }


[flink] 02/02: [FLINK-17965][sql-parser-hive] Hive dialect needs to unescape backslash in string literals

Posted by lz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2d0b4473f3f73b3d43c38bfe5d30bc1421aead41
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Jun 11 14:45:32 2020 +0800

    [FLINK-17965][sql-parser-hive] Hive dialect needs to unescape backslash in string literals
    
    
    This closes #12378
---
 .../table/catalog/hive/util/HiveTableUtil.java     |  4 ++
 .../flink/connectors/hive/HiveDialectITCase.java   |  1 +
 .../hive/TableEnvHiveConnectorITCase.java          |  2 +-
 .../flink/sql/parser/hive/ddl/HiveDDLUtils.java    | 73 ++++++++++++++++++++++
 .../sql/parser/hive/ddl/SqlAddHivePartitions.java  |  3 +
 .../parser/hive/ddl/SqlAlterHiveDatabaseProps.java |  1 +
 .../hive/ddl/SqlAlterHivePartitionRename.java      |  2 +
 .../parser/hive/ddl/SqlAlterHiveTableProps.java    |  3 +-
 .../parser/hive/ddl/SqlAlterHiveTableSerDe.java    |  1 +
 .../hive/ddl/SqlAlterHiveViewProperties.java       |  1 +
 .../sql/parser/hive/ddl/SqlCreateHiveDatabase.java |  8 ++-
 .../sql/parser/hive/ddl/SqlCreateHiveTable.java    | 17 ++++-
 .../sql/parser/hive/ddl/SqlCreateHiveView.java     | 13 +++-
 .../sql/parser/hive/dml/RichSqlHiveInsert.java     |  2 +
 .../parser/hive/FlinkHiveSqlParserImplTest.java    | 12 +++-
 15 files changed, 134 insertions(+), 9 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index a390fbd..d5d5413 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -252,6 +252,10 @@ public class HiveTableUtil {
 			String key = prop.equals(HiveTableRowFormat.COLLECTION_DELIM) ?
 					serdeConstants.COLLECTION_DELIM : prop.substring(SERDE_INFO_PROP_PREFIX.length());
 			sd.getSerdeInfo().getParameters().put(key, value);
+			// make sure FIELD_DELIM and SERIALIZATION_FORMAT are consistent
+			if (key.equals(serdeConstants.FIELD_DELIM)) {
+				sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, value);
+			}
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 12730b5..4b588bf 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -172,6 +172,7 @@ public class HiveDialectITCase {
 		tableEnv.executeSql("create table tbl4 (x int,y smallint) row format delimited fields terminated by '|' lines terminated by '\n'");
 		hiveTable = hiveCatalog.getHiveTable(new ObjectPath("default", "tbl4"));
 		assertEquals("|", hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.FIELD_DELIM));
+		assertEquals("|", hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.SERIALIZATION_FORMAT));
 		assertEquals("\n", hiveTable.getSd().getSerdeInfo().getParameters().get(serdeConstants.LINE_DELIM));
 
 		tableEnv.executeSql("create table tbl5 (m map<bigint,string>) row format delimited collection items terminated by ';' " +
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
index 367edb9..dd12899 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
@@ -581,7 +581,7 @@ public class TableEnvHiveConnectorITCase {
 		try {
 			tableEnv.executeSql("create table db1.src (x int,y string) " +
 					"row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe' " +
-					"with serdeproperties ('input.regex'='([\\d]+)\\u0001([\\S]+)')");
+					"with serdeproperties ('input.regex'='([\\\\d]+)\\u0001([\\\\S]+)')");
 			HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src")
 					.addRow(new Object[]{1, "a"})
 					.addRow(new Object[]{2, "ab"})
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
index 934e76a..478aac8 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/HiveDDLUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.sql.parser.hive.ddl;
 
+import org.apache.flink.sql.parser.SqlProperty;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.sql.parser.hive.impl.ParseException;
@@ -27,6 +28,8 @@ import org.apache.flink.sql.parser.type.SqlMapTypeNameSpec;
 import org.apache.flink.table.catalog.config.CatalogConfig;
 
 import org.apache.calcite.sql.SqlBasicTypeNameSpec;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
@@ -34,6 +37,9 @@ import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlTypeNameSpec;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.util.SqlShuttle;
+import org.apache.calcite.util.NlsString;
+import org.apache.commons.lang3.StringEscapeUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -70,6 +76,8 @@ public class HiveDDLUtils {
 	private static final Set<String> RESERVED_TABLE_PROPERTIES = new HashSet<>();
 	private static final List<String> RESERVED_TABLE_PROP_PREFIX = new ArrayList<>();
 
+	private static final UnescapeStringLiteralShuttle UNESCAPE_SHUTTLE = new UnescapeStringLiteralShuttle();
+
 	static {
 		RESERVED_DB_PROPERTIES.addAll(Arrays.asList(ALTER_DATABASE_OP, DATABASE_LOCATION_URI));
 
@@ -317,4 +325,69 @@ public class HiveDDLUtils {
 				column.getParserPosition()
 		);
 	}
+
+	// the input of sql-client will escape '\', unescape it so that users can write hive dialect
+	public static void unescapeProperties(SqlNodeList properties) {
+		if (properties != null) {
+			properties.accept(UNESCAPE_SHUTTLE);
+		}
+	}
+
+	public static SqlCharStringLiteral unescapeStringLiteral(SqlCharStringLiteral literal) {
+		if (literal != null) {
+			return (SqlCharStringLiteral) literal.accept(UNESCAPE_SHUTTLE);
+		}
+		return null;
+	}
+
+	public static void unescapePartitionSpec(SqlNodeList partSpec) {
+		if (partSpec != null) {
+			partSpec.accept(UNESCAPE_SHUTTLE);
+		}
+	}
+
+	private static class UnescapeStringLiteralShuttle extends SqlShuttle {
+
+		@Override
+		public SqlNode visit(SqlNodeList nodeList) {
+			for (int i = 0; i < nodeList.size(); i++) {
+				SqlNode unescaped = nodeList.get(i).accept(this);
+				nodeList.set(i, unescaped);
+			}
+			return nodeList;
+		}
+
+		@Override
+		public SqlNode visit(SqlCall call) {
+			if (call instanceof SqlProperty) {
+				SqlProperty property = (SqlProperty) call;
+				Comparable comparable = SqlLiteral.value(property.getValue());
+				if (comparable instanceof NlsString) {
+					String val = StringEscapeUtils.unescapeJava(((NlsString) comparable).getValue());
+					return new SqlProperty(
+							property.getKey(),
+							SqlLiteral.createCharString(val, property.getValue().getParserPosition()),
+							property.getParserPosition());
+				}
+			} else if (call instanceof SqlTableOption) {
+				SqlTableOption option = (SqlTableOption) call;
+				String key = StringEscapeUtils.unescapeJava(option.getKeyString());
+				String val = StringEscapeUtils.unescapeJava(option.getValueString());
+				SqlNode keyNode = SqlLiteral.createCharString(key, option.getKey().getParserPosition());
+				SqlNode valNode = SqlLiteral.createCharString(val, option.getValue().getParserPosition());
+				return new SqlTableOption(keyNode, valNode, option.getParserPosition());
+			}
+			return call;
+		}
+
+		@Override
+		public SqlNode visit(SqlLiteral literal) {
+			if (literal instanceof SqlCharStringLiteral) {
+				SqlCharStringLiteral stringLiteral = (SqlCharStringLiteral) literal;
+				String unescaped = StringEscapeUtils.unescapeJava(stringLiteral.getNlsString().getValue());
+				return SqlLiteral.createCharString(unescaped, stringLiteral.getParserPosition());
+			}
+			return literal;
+		}
+	}
 }
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
index d003f6e..c14a749 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
@@ -43,6 +43,9 @@ public class SqlAddHivePartitions extends SqlAddPartitions {
 	public SqlAddHivePartitions(SqlParserPos pos, SqlIdentifier tableName, boolean ifNotExists,
 			List<SqlNodeList> partSpecs, List<SqlCharStringLiteral> partLocations) {
 		super(pos, tableName, ifNotExists, partSpecs, toProps(partLocations));
+		for (SqlNodeList spec : partSpecs) {
+			HiveDDLUtils.unescapePartitionSpec(spec);
+		}
 		this.partLocations = partLocations;
 	}
 
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabaseProps.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabaseProps.java
index 5f500ab..a0554a7 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabaseProps.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveDatabaseProps.java
@@ -34,6 +34,7 @@ public class SqlAlterHiveDatabaseProps extends SqlAlterHiveDatabase {
 	public SqlAlterHiveDatabaseProps(SqlParserPos pos, SqlIdentifier databaseName, SqlNodeList propertyList)
 			throws ParseException {
 		super(pos, databaseName, HiveDDLUtils.checkReservedDBProperties(propertyList));
+		HiveDDLUtils.unescapeProperties(getPropertyList());
 	}
 
 	@Override
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHivePartitionRename.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHivePartitionRename.java
index 68d6461..8757094 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHivePartitionRename.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHivePartitionRename.java
@@ -45,6 +45,8 @@ public class SqlAlterHivePartitionRename extends SqlAlterTable {
 		if (partSpec == null || newPartSpec == null) {
 			throw new ParseException("Both old and new partition spec have to be specified");
 		}
+		HiveDDLUtils.unescapePartitionSpec(partSpec);
+		HiveDDLUtils.unescapePartitionSpec(newPartSpec);
 		this.newPartSpec = newPartSpec;
 	}
 
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableProps.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableProps.java
index 324d588..c458d8f 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableProps.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableProps.java
@@ -38,6 +38,7 @@ public class SqlAlterHiveTableProps extends SqlAlterHiveTable {
 	public SqlAlterHiveTableProps(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList)
 			throws ParseException {
 		super(CHANGE_TBL_PROPS, pos, tableName, null, HiveDDLUtils.checkReservedTableProperties(propertyList));
+		HiveDDLUtils.unescapeProperties(propertyList);
 		// remove the last property which is the ALTER_TABLE_OP
 		this.origProps = new SqlNodeList(propertyList.getList().subList(0, propertyList.size() - 1),
 				propertyList.getParserPosition());
@@ -48,7 +49,7 @@ public class SqlAlterHiveTableProps extends SqlAlterHiveTable {
 		super.unparse(writer, leftPrec, rightPrec);
 		writer.keyword("SET TBLPROPERTIES");
 		SqlWriter.Frame withFrame = writer.startList("(", ")");
-		for (SqlNode property : getPropertyList()) {
+		for (SqlNode property : origProps) {
 			printIndent(writer);
 			property.unparse(writer, leftPrec, rightPrec);
 		}
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableSerDe.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableSerDe.java
index f9ce8d2..0e18fe5 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableSerDe.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveTableSerDe.java
@@ -42,6 +42,7 @@ public class SqlAlterHiveTableSerDe extends SqlAlterHiveTable {
 	public SqlAlterHiveTableSerDe(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList partitionSpec,
 			SqlNodeList propertyList, SqlCharStringLiteral serdeLib) throws ParseException {
 		super(CHANGE_SERDE_PROPS, pos, tableName, partitionSpec, HiveDDLUtils.checkReservedTableProperties(propertyList));
+		HiveDDLUtils.unescapeProperties(propertyList);
 		// remove the last property which is the ALTER_TABLE_OP
 		origSerDeProps = new SqlNodeList(propertyList.getList().subList(0, propertyList.size() - 1),
 				propertyList.getParserPosition());
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveViewProperties.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveViewProperties.java
index d8d29a1..8e0cb16 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveViewProperties.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAlterHiveViewProperties.java
@@ -35,6 +35,7 @@ public class SqlAlterHiveViewProperties extends SqlAlterViewProperties {
 
 	public SqlAlterHiveViewProperties(SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
 		super(pos, tableName, propertyList);
+		HiveDDLUtils.unescapeProperties(propertyList);
 	}
 
 	@Override
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java
index d7e7bd5..474abce 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveDatabase.java
@@ -43,7 +43,13 @@ public class SqlCreateHiveDatabase extends SqlCreateDatabase {
 
 	public SqlCreateHiveDatabase(SqlParserPos pos, SqlIdentifier databaseName, SqlNodeList propertyList,
 			SqlCharStringLiteral comment, SqlCharStringLiteral location, boolean ifNotExists) throws ParseException {
-		super(pos, databaseName, HiveDDLUtils.checkReservedDBProperties(propertyList), comment, ifNotExists);
+		super(
+				pos,
+				databaseName,
+				HiveDDLUtils.checkReservedDBProperties(propertyList),
+				HiveDDLUtils.unescapeStringLiteral(comment),
+				ifNotExists
+		);
 		HiveDDLUtils.ensureNonGeneric(propertyList);
 		originPropList = new SqlNodeList(propertyList.getList(), propertyList.getParserPosition());
 		// mark it as a hive database
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
index 3fcec91..ee03b2e 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
@@ -64,9 +64,19 @@ public class SqlCreateHiveTable extends SqlCreateTable {
 			SqlNodeList partColList, @Nullable SqlCharStringLiteral comment, boolean isTemporary, boolean isExternal,
 			HiveTableRowFormat rowFormat, HiveTableStoredAs storedAs, SqlCharStringLiteral location) throws ParseException {
 
-		super(pos, tableName, columnList, creationContext.constraints,
-				HiveDDLUtils.checkReservedTableProperties(propertyList), extractPartColIdentifiers(partColList), null,
-				comment, null, isTemporary);
+		super(
+				pos,
+				tableName,
+				columnList,
+				creationContext.constraints,
+				HiveDDLUtils.checkReservedTableProperties(propertyList),
+				extractPartColIdentifiers(partColList),
+				null,
+				HiveDDLUtils.unescapeStringLiteral(comment),
+				null,
+				isTemporary
+		);
+		HiveDDLUtils.unescapeProperties(propertyList);
 
 		this.origColList = HiveDDLUtils.deepCopyColList(columnList);
 		this.origPartColList = partColList != null ?
@@ -465,6 +475,7 @@ public class SqlCreateHiveTable extends SqlCreateTable {
 					list.add(HiveDDLUtils.toTableOption(prop, delimitPropToValue.get(prop), pos));
 				}
 			}
+			HiveDDLUtils.unescapeProperties(list);
 			return list;
 		}
 
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveView.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveView.java
index 323ca85..b6f0e78 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveView.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveView.java
@@ -42,7 +42,18 @@ public class SqlCreateHiveView extends SqlCreateView {
 
 	public SqlCreateHiveView(SqlParserPos pos, SqlIdentifier viewName, SqlNodeList fieldList, SqlNode query,
 			boolean ifNotExists, SqlCharStringLiteral comment, SqlNodeList properties) {
-		super(pos, viewName, fieldList, query, false, false, ifNotExists, comment, properties);
+		super(
+				pos,
+				viewName,
+				fieldList,
+				query,
+				false,
+				false,
+				ifNotExists,
+				HiveDDLUtils.unescapeStringLiteral(comment),
+				properties
+		);
+		HiveDDLUtils.unescapeProperties(properties);
 		originPropList = new SqlNodeList(properties.getList(), properties.getParserPosition());
 		// mark it as a hive view
 		properties.add(HiveDDLUtils.toTableOption(CatalogConfig.IS_GENERIC, "false", pos));
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/dml/RichSqlHiveInsert.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/dml/RichSqlHiveInsert.java
index b6a0dcb..eb1793c 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/dml/RichSqlHiveInsert.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/dml/RichSqlHiveInsert.java
@@ -20,6 +20,7 @@ package org.apache.flink.sql.parser.hive.dml;
 
 import org.apache.flink.sql.parser.SqlProperty;
 import org.apache.flink.sql.parser.dml.RichSqlInsert;
+import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
 
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
@@ -48,6 +49,7 @@ public class RichSqlHiveInsert extends RichSqlInsert {
 			SqlNodeList staticPartitions,
 			SqlNodeList allPartKeys) {
 		super(pos, keywords, extendedKeywords, targetTable, source, columnList, staticPartitions);
+		HiveDDLUtils.unescapePartitionSpec(staticPartitions);
 		this.allPartKeys = allPartKeys;
 		partKeyToSpec = getPartKeyToSpec(staticPartitions);
 	}
diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
index 7130713..fd11047 100644
--- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java
@@ -278,8 +278,7 @@ public class FlinkHiveSqlParserImplTest extends SqlParserTest {
 						")");
 		sql("alter table tbl set serdeproperties('line.delim'='\n')")
 				.ok("ALTER TABLE `TBL` SET SERDEPROPERTIES (\n" +
-						"  'line.delim' = '\n" +
-						"'\n" +
+						"  'line.delim' = '\n'\n" +
 						")");
 	}
 
@@ -301,6 +300,15 @@ public class FlinkHiveSqlParserImplTest extends SqlParserTest {
 						"PARTITION (`P` = 2)");
 	}
 
+	@Test
+	public void testAlterTableProperties() {
+		sql("alter table tbl set tblproperties('k1'='v1','k2'='v2')")
+				.ok("ALTER TABLE `TBL` SET TBLPROPERTIES (\n" +
+						"  'k1' = 'v1',\n" +
+						"  'k2' = 'v2'\n" +
+						")");
+	}
+
 	// TODO: support EXCHANGE PARTITION, RECOVER PARTITIONS
 
 	// TODO: support (UN)ARCHIVE PARTITION