You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/31 02:15:16 UTC

[flink] 02/13: [FLINK-14338][sql-parser] Bump sql parser Calcite dependency to 1.22.0

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 78499853bff8eccc2b174a883a7cfdc4125eb499
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Tue Mar 17 19:00:55 2020 +0800

    [FLINK-14338][sql-parser] Bump sql parser Calcite dependency to 1.22.0
    
    * Add Junit5 supports for tests
    * Move classes under package calcite.sql to flink.sql.parser.type
    * Remove ExtendedSqlBasicTypeNameSpec, use SqlAlienSystemTypeNameSpec instead
    * In Parser.tdd, re-organize the imports order to alphabetical order, remove the useless tailing commas, extends nonReservedKeywordsToAdd
    * Fix the JavaCC compile warnings
---
 .../src/main/codegen/data/Parser.tdd               | 201 ++++++-----
 .../src/main/codegen/includes/parserImpls.ftl      |  19 +-
 .../java/org/apache/calcite/sql/package-info.java  |  27 --
 .../flink/sql/parser/ddl/SqlCreateTable.java       |  18 +-
 .../parser/type/ExtendedSqlBasicTypeNameSpec.java  |  57 ---
 .../parser/type}/ExtendedSqlRowTypeNameSpec.java   |   8 +-
 .../sql/parser/type}/SqlMapTypeNameSpec.java       |   6 +-
 .../sql/parser/validate/FlinkSqlConformance.java   |  10 +
 .../flink/sql/parser/FlinkDDLDataTypeTest.java     |   4 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 392 +++++++++++----------
 10 files changed, 365 insertions(+), 377 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 1783a5e..14158c7 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -21,69 +21,70 @@
 
   # List of additional classes and packages to import.
   # Example. "org.apache.calcite.sql.*", "java.util.List".
+  # Please keep the import classes in alphabetical order if new class is added.
   imports: [
-    "org.apache.calcite.sql.ExtendedSqlRowTypeNameSpec",
-    "org.apache.calcite.sql.SqlMapTypeNameSpec",
-    "org.apache.flink.sql.parser.ddl.SqlCreateTable",
+    "org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
+    "org.apache.flink.sql.parser.ddl.SqlAlterFunction"
+    "org.apache.flink.sql.parser.ddl.SqlAlterTable"
+    "org.apache.flink.sql.parser.ddl.SqlAlterTableProperties"
+    "org.apache.flink.sql.parser.ddl.SqlAlterTableRename"
+    "org.apache.flink.sql.parser.ddl.SqlCreateCatalog"
+    "org.apache.flink.sql.parser.ddl.SqlCreateDatabase"
+    "org.apache.flink.sql.parser.ddl.SqlCreateFunction"
+    "org.apache.flink.sql.parser.ddl.SqlCreateTable"
+    "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
+    "org.apache.flink.sql.parser.ddl.SqlCreateView"
+    "org.apache.flink.sql.parser.ddl.SqlDropDatabase"
+    "org.apache.flink.sql.parser.ddl.SqlDropFunction"
     "org.apache.flink.sql.parser.ddl.SqlDropTable"
-    "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext",
-    "org.apache.flink.sql.parser.ddl.SqlCreateView",
-    "org.apache.flink.sql.parser.ddl.SqlDropView",
-    "org.apache.flink.sql.parser.ddl.SqlTableColumn",
-    "org.apache.flink.sql.parser.ddl.SqlTableOption",
-    "org.apache.flink.sql.parser.ddl.SqlWatermark",
-    "org.apache.flink.sql.parser.ddl.SqlUseCatalog",
-    "org.apache.flink.sql.parser.ddl.SqlUseDatabase",
-    "org.apache.flink.sql.parser.ddl.SqlCreateDatabase",
-    "org.apache.flink.sql.parser.ddl.SqlDropDatabase",
-    "org.apache.flink.sql.parser.ddl.SqlAlterDatabase",
-    "org.apache.flink.sql.parser.ddl.SqlAlterFunction",
-    "org.apache.flink.sql.parser.ddl.SqlCreateFunction",
-    "org.apache.flink.sql.parser.ddl.SqlDropFunction",
-    "org.apache.flink.sql.parser.ddl.SqlAlterTable",
-    "org.apache.flink.sql.parser.ddl.SqlAlterTableRename",
-    "org.apache.flink.sql.parser.ddl.SqlAlterTableProperties",
-    "org.apache.flink.sql.parser.ddl.SqlCreateCatalog",
-    "org.apache.flink.sql.parser.dml.RichSqlInsert",
-    "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword",
-    "org.apache.flink.sql.parser.dql.SqlShowCatalogs",
-    "org.apache.flink.sql.parser.dql.SqlDescribeCatalog",
-    "org.apache.flink.sql.parser.dql.SqlShowDatabases",
-    "org.apache.flink.sql.parser.dql.SqlShowFunctions",
-    "org.apache.flink.sql.parser.dql.SqlDescribeDatabase",
-    "org.apache.flink.sql.parser.dql.SqlShowTables",
-    "org.apache.flink.sql.parser.dql.SqlRichDescribeTable",
-    "org.apache.flink.sql.parser.type.ExtendedSqlBasicTypeNameSpec",
-    "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec",
-    "org.apache.flink.sql.parser.utils.SqlTimeUnit",
-    "org.apache.flink.sql.parser.utils.ParserResource",
-    "org.apache.flink.sql.parser.validate.FlinkSqlConformance",
-    "org.apache.flink.sql.parser.SqlProperty",
-    "org.apache.calcite.sql.SqlDrop",
-    "org.apache.calcite.sql.SqlCreate",
-    "java.util.List",
+    "org.apache.flink.sql.parser.ddl.SqlDropView"
+    "org.apache.flink.sql.parser.ddl.SqlTableColumn"
+    "org.apache.flink.sql.parser.ddl.SqlTableOption"
+    "org.apache.flink.sql.parser.ddl.SqlUseCatalog"
+    "org.apache.flink.sql.parser.ddl.SqlUseDatabase"
+    "org.apache.flink.sql.parser.ddl.SqlWatermark"
+    "org.apache.flink.sql.parser.dml.RichSqlInsert"
+    "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
+    "org.apache.flink.sql.parser.dql.SqlDescribeCatalog"
+    "org.apache.flink.sql.parser.dql.SqlDescribeDatabase"
+    "org.apache.flink.sql.parser.dql.SqlShowCatalogs"
+    "org.apache.flink.sql.parser.dql.SqlShowDatabases"
+    "org.apache.flink.sql.parser.dql.SqlShowFunctions"
+    "org.apache.flink.sql.parser.dql.SqlShowTables"
+    "org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
+    "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
+    "org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
+    "org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
+    "org.apache.flink.sql.parser.utils.SqlTimeUnit"
+    "org.apache.flink.sql.parser.utils.ParserResource"
+    "org.apache.flink.sql.parser.validate.FlinkSqlConformance"
+    "org.apache.flink.sql.parser.SqlProperty"
+    "org.apache.calcite.sql.SqlAlienSystemTypeNameSpec"
+    "org.apache.calcite.sql.SqlCreate"
+    "org.apache.calcite.sql.SqlDrop"
+    "java.util.List"
     "java.util.ArrayList"
   ]
 
   # List of new keywords. Example: "DATABASES", "TABLES". If the keyword is not a reserved
   # keyword, please also add it to 'nonReservedKeywords' section.
   keywords: [
-    "COMMENT",
-    "PARTITIONED",
-    "IF",
-    "WATERMARK",
-    "OVERWRITE",
-    "STRING",
-    "BYTES",
-    "RAW",
-    "CATALOGS",
-    "USE",
-    "DATABASES",
-    "FUNCTIONS",
-    "EXTENDED",
-    "SCALA",
-    "TABLES",
+    "BYTES"
+    "CATALOGS"
+    "COMMENT"
+    "DATABASES"
+    "EXTENDED"
+    "FUNCTIONS"
+    "IF"
+    "OVERWRITE"
+    "PARTITIONED"
+    "RAW"
     "RENAME"
+    "SCALA"
+    "STRING"
+    "TABLES"
+    "USE"
+    "WATERMARK"
   ]
 
   # List of keywords from "keywords" section that are not reserved.
@@ -109,15 +110,14 @@
     "C"
     "CASCADE"
     "CATALOG"
-    "CATALOGS"
     "CATALOG_NAME"
     "CENTURY"
     "CHAIN"
+    "CHARACTERISTICS"
+    "CHARACTERS"
     "CHARACTER_SET_CATALOG"
     "CHARACTER_SET_NAME"
     "CHARACTER_SET_SCHEMA"
-    "CHARACTERISTICS"
-    "CHARACTERS"
     "CLASS_ORIGIN"
     "COBOL"
     "COLLATION"
@@ -128,22 +128,22 @@
     "COMMAND_FUNCTION"
     "COMMAND_FUNCTION_CODE"
     "COMMITTED"
-    "CONDITION_NUMBER"
     "CONDITIONAL"
+    "CONDITION_NUMBER"
     "CONNECTION"
     "CONNECTION_NAME"
     "CONSTRAINT_CATALOG"
     "CONSTRAINT_NAME"
-    "CONSTRAINT_SCHEMA"
     "CONSTRAINTS"
+    "CONSTRAINT_SCHEMA"
     "CONSTRUCTOR"
     "CONTINUE"
     "CURSOR_NAME"
     "DATA"
     "DATABASE"
-    "DATABASES"
     "DATETIME_INTERVAL_CODE"
     "DATETIME_INTERVAL_PRECISION"
+    "DAYS"
     "DECADE"
     "DEFAULTS"
     "DEFERRABLE"
@@ -169,7 +169,6 @@
     "EXCEPTION"
     "EXCLUDE"
     "EXCLUDING"
-    "EXTENDED"
     "FINAL"
     "FIRST"
     "FOLLOWING"
@@ -177,7 +176,6 @@
     "FORTRAN"
     "FOUND"
     "FRAC_SECOND"
-    "FUNCTIONS"
     "G"
     "GENERAL"
     "GENERATED"
@@ -186,6 +184,7 @@
     "GOTO"
     "GRANTED"
     "HIERARCHY"
+    "HOURS"
     "IGNORE"
     "IMMEDIATE"
     "IMMEDIATELY"
@@ -198,8 +197,8 @@
     "INSTANTIABLE"
     "INVOKER"
     "ISODOW"
-    "ISOYEAR"
     "ISOLATION"
+    "ISOYEAR"
     "JAVA"
     "JSON"
     "K"
@@ -213,15 +212,18 @@
     "LIBRARY"
     "LOCATOR"
     "M"
+    "MAP"
     "MATCHED"
     "MAXVALUE"
-    "MICROSECOND"
     "MESSAGE_LENGTH"
     "MESSAGE_OCTET_LENGTH"
     "MESSAGE_TEXT"
-    "MILLISECOND"
+    "MICROSECOND"
     "MILLENNIUM"
+    "MILLISECOND"
+    "MINUTES"
     "MINVALUE"
+    "MONTHS"
     "MORE_"
     "MUMPS"
     "NAME"
@@ -265,7 +267,6 @@
     "QUARTER"
     "READ"
     "RELATIVE"
-    "RENAME"
     "REPEATABLE"
     "REPLACE"
     "RESPECT"
@@ -289,6 +290,7 @@
     "SCOPE_CATALOGS"
     "SCOPE_NAME"
     "SCOPE_SCHEMA"
+    "SECONDS"
     "SECTION"
     "SECURITY"
     "SELF"
@@ -329,8 +331,8 @@
     "SQL_INTERVAL_YEAR"
     "SQL_INTERVAL_YEAR_TO_MONTH"
     "SQL_LONGVARBINARY"
-    "SQL_LONGVARNCHAR"
     "SQL_LONGVARCHAR"
+    "SQL_LONGVARNCHAR"
     "SQL_NCHAR"
     "SQL_NCLOB"
     "SQL_NUMERIC"
@@ -358,7 +360,6 @@
     "STYLE"
     "SUBCLASS_ORIGIN"
     "SUBSTITUTE"
-    "TABLES"
     "TABLE_NAME"
     "TEMPORARY"
     "TIES"
@@ -374,6 +375,7 @@
     "TRIGGER_CATALOG"
     "TRIGGER_NAME"
     "TRIGGER_SCHEMA"
+    "TUMBLE"
     "TYPE"
     "UNBOUNDED"
     "UNCOMMITTED"
@@ -381,45 +383,54 @@
     "UNDER"
     "UNNAMED"
     "USAGE"
-    "USE"
     "USER_DEFINED_TYPE_CATALOG"
     "USER_DEFINED_TYPE_CODE"
     "USER_DEFINED_TYPE_NAME"
     "USER_DEFINED_TYPE_SCHEMA"
-    "UTF8"
     "UTF16"
     "UTF32"
+    "UTF8"
     "VERSION"
     "VIEW"
     "WEEK"
-    "WRAPPER"
     "WORK"
+    "WRAPPER"
     "WRITE"
     "XML"
-    "ZONE",
+    "YEARS"
+    "ZONE"
+  ]
 
+  # List of non-reserved keywords to add;
+  # items in this list become non-reserved
+  nonReservedKeywordsToAdd: [
     # not in core, added in Flink
-    "PARTITIONED",
-    "IF",
+    "IF"
     "OVERWRITE"
+    "PARTITIONED"
+  ]
+
+  # List of non-reserved keywords to remove;
+  # items in this list become reserved
+  nonReservedKeywordsToRemove: [
   ]
 
   # List of methods for parsing custom SQL statements.
   # Return type of method implementation should be 'SqlNode'.
   # Example: SqlShowDatabases(), SqlShowTables().
   statementParserMethods: [
-    "RichSqlInsert()",
-    "SqlShowCatalogs()",
-    "SqlDescribeCatalog()",
-    "SqlUseCatalog()",
-    "SqlShowDatabases()",
-    "SqlUseDatabase()",
-    "SqlAlterDatabase()",
-    "SqlDescribeDatabase()",
-    "SqlAlterFunction()",
-    "SqlShowFunctions()",
-    "SqlShowTables()",
-    "SqlRichDescribeTable()",
+    "RichSqlInsert()"
+    "SqlShowCatalogs()"
+    "SqlDescribeCatalog()"
+    "SqlUseCatalog()"
+    "SqlShowDatabases()"
+    "SqlUseDatabase()"
+    "SqlAlterDatabase()"
+    "SqlDescribeDatabase()"
+    "SqlAlterFunction()"
+    "SqlShowFunctions()"
+    "SqlShowTables()"
+    "SqlRichDescribeTable()"
     "SqlAlterTable()"
   ]
 
@@ -433,9 +444,9 @@
   # Return type of method implementation should be "SqlTypeNameSpec".
   # Example: SqlParseTimeStampZ().
   dataTypeParserMethods: [
-    "ExtendedSqlBasicTypeName()",
-    "CustomizedCollectionsTypeName()",
-    "SqlMapTypeName()",
+    "ExtendedSqlBasicTypeName()"
+    "CustomizedCollectionsTypeName()"
+    "SqlMapTypeName()"
     "ExtendedSqlRowTypeName()"
   ]
 
@@ -454,18 +465,18 @@
   # List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
   # Each must accept arguments "(SqlParserPos pos, boolean replace)".
   createStatementParserMethods: [
-    "SqlCreateCatalog",
-    "SqlCreateTable",
-    "SqlCreateView",
-    "SqlCreateDatabase",
+    "SqlCreateCatalog"
+    "SqlCreateTable"
+    "SqlCreateView"
+    "SqlCreateDatabase"
     "SqlCreateFunction"
   ]
 
   # List of methods for parsing extensions to "DROP" calls.
   # Each must accept arguments "(Span s)".
   dropStatementParserMethods: [
-    "SqlDropTable",
-    "SqlDropView",
+    "SqlDropTable"
+    "SqlDropView"
     "SqlDropDatabase"
     "SqlDropFunction"
   ]
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index a93ef66..96a25a4 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -119,7 +119,10 @@ SqlCreate SqlCreateDatabase(Span s, boolean replace) :
 }
 {
     <DATABASE> { startPos = getPos(); }
-    [ <IF> <NOT> <EXISTS> { ifNotExists = true; } ]
+    [
+        LOOKAHEAD(3)
+        <IF> <NOT> <EXISTS> { ifNotExists = true; }
+    ]
     databaseName = CompoundIdentifier()
     [ <COMMENT> <QUOTED_STRING>
         {
@@ -217,7 +220,10 @@ SqlCreate SqlCreateFunction(Span s, boolean replace) :
 
     <FUNCTION>
 
-    [ <IF> <NOT> <EXISTS> { ifNotExists = true; } ]
+    [
+        LOOKAHEAD(3)
+        <IF> <NOT> <EXISTS> { ifNotExists = true; }
+    ]
 
     functionIdentifier = CompoundIdentifier()
 
@@ -248,12 +254,13 @@ SqlDrop SqlDropFunction(Span s, boolean replace) :
     boolean isSystemFunction = false;
 }
 {
-    [ <TEMPORARY> {isTemporary = true;}
+    [
+        <TEMPORARY> {isTemporary = true;}
         [  <SYSTEM>   { isSystemFunction = true; }  ]
     ]
     <FUNCTION>
 
-    [ <IF> <EXISTS> { ifExists = true; } ]
+    [ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
 
     functionIdentifier = CompoundIdentifier()
 
@@ -281,7 +288,7 @@ SqlAlterFunction SqlAlterFunction() :
 
     <FUNCTION> { startPos = getPos(); }
 
-    [ <IF> <EXISTS> { ifExists = true; } ]
+    [ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
 
     functionIdentifier = CompoundIdentifier()
 
@@ -806,7 +813,7 @@ SqlTypeNameSpec ExtendedSqlBasicTypeName() :
         }
     )
     {
-        return new ExtendedSqlBasicTypeNameSpec(typeAlias, typeName, precision, getPos());
+        return new SqlAlienSystemTypeNameSpec(typeAlias, typeName, precision, getPos());
     }
 }
 
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/package-info.java b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/package-info.java
deleted file mode 100644
index 439912b..0000000
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/package-info.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package is needed because the constructor of SqlTypeNameSpec is package scope,
- * we should merge this package into org.apache.flink.sql.parser.type when CALCITE-3360
- * is resolved.
- */
-@PackageMarker
-package org.apache.calcite.sql;
-
-import org.apache.calcite.avatica.util.PackageMarker;
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index c4c7f11..19839a0 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -20,8 +20,8 @@ package org.apache.flink.sql.parser.ddl;
 
 import org.apache.flink.sql.parser.ExtendedSqlNode;
 import org.apache.flink.sql.parser.error.SqlValidateException;
+import org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec;
 
-import org.apache.calcite.sql.ExtendedSqlRowTypeNameSpec;
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlCharStringLiteral;
@@ -69,10 +69,8 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 
 	private final SqlNodeList partitionKeyList;
 
-	@Nullable
 	private final SqlWatermark watermark;
 
-	@Nullable
 	private final SqlCharStringLiteral comment;
 
 	public SqlCreateTable(
@@ -83,8 +81,8 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 			List<SqlNodeList> uniqueKeysList,
 			SqlNodeList propertyList,
 			SqlNodeList partitionKeyList,
-			SqlWatermark watermark,
-			SqlCharStringLiteral comment) {
+			@Nullable SqlWatermark watermark,
+			@Nullable SqlCharStringLiteral comment) {
 		super(OPERATOR, pos, false, false);
 		this.tableName = requireNonNull(tableName, "tableName should not be null");
 		this.columnList = requireNonNull(columnList, "columnList should not be null");
@@ -219,10 +217,12 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 	 * have been reversed.
 	 */
 	public String getColumnSqlString() {
-		SqlPrettyWriter writer = new SqlPrettyWriter(AnsiSqlDialect.DEFAULT);
-		writer.setAlwaysUseParentheses(true);
-		writer.setSelectListItemsOnSeparateLines(false);
-		writer.setIndentation(0);
+		SqlPrettyWriter writer = new SqlPrettyWriter(
+			SqlPrettyWriter.config()
+				.withDialect(AnsiSqlDialect.DEFAULT)
+				.withAlwaysUseParentheses(true)
+				.withSelectListItemsOnSeparateLines(false)
+				.withIndentation(0));
 		writer.startList("", "");
 		for (SqlNode column : columnList) {
 			writer.sep(",");
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlBasicTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlBasicTypeNameSpec.java
deleted file mode 100644
index ec81a0d..0000000
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlBasicTypeNameSpec.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.sql.parser.type;
-
-import org.apache.calcite.sql.SqlBasicTypeNameSpec;
-import org.apache.calcite.sql.SqlWriter;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * A sql type name specification of extended basic data type, it has a counterpart basic
- * sql type name but always represents as a special alias in Flink.
- *
- * <p>For example, STRING is synonym of VARCHAR(INT_MAX)
- * and BYTES is synonym of VARBINARY(INT_MAX).
- */
-public class ExtendedSqlBasicTypeNameSpec extends SqlBasicTypeNameSpec {
-	// Type alias used for unparsing.
-	private final String typeAlias;
-
-	/**
-	 * Creates a {@code ExtendedSqlBuiltinTypeNameSpec} instance.
-	 *
-	 * @param typeName  type name
-	 * @param precision type precision
-	 * @param pos       parser position
-	 */
-	public ExtendedSqlBasicTypeNameSpec(
-			String typeAlias,
-			SqlTypeName typeName,
-			int precision,
-			SqlParserPos pos) {
-		super(typeName, precision, pos);
-		this.typeAlias = typeAlias;
-	}
-
-	@Override
-	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-		writer.keyword(typeAlias);
-	}
-}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/ExtendedSqlRowTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
similarity index 93%
rename from flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/ExtendedSqlRowTypeNameSpec.java
rename to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
index 42828d8..37e841b 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/ExtendedSqlRowTypeNameSpec.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
@@ -16,10 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.calcite.sql;
+package org.apache.flink.sql.parser.type;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlRowTypeNameSpec;
+import org.apache.calcite.sql.SqlTypeNameSpec;
+import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlValidator;
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlMapTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java
similarity index 93%
rename from flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlMapTypeNameSpec.java
rename to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java
index 0e22dbf..5725456 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlMapTypeNameSpec.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java
@@ -16,9 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.calcite.sql;
+package org.apache.flink.sql.parser.type;
 
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlTypeNameSpec;
+import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlValidator;
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
index d53b050..cf32026 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
@@ -146,6 +146,16 @@ public enum FlinkSqlConformance implements SqlConformance {
 		return false;
 	}
 
+	@Override
+	public boolean allowPluralTimeUnits() {
+		return false;
+	}
+
+	@Override
+	public boolean allowQualifyingCommonColumn() {
+		return true;
+	}
+
 	/**
 	 * Whether to allow "create table T(i int, j int) partitioned by (i)" grammar.
 	 */
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
index f5edc4f..5e0e0e5 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
@@ -44,6 +44,7 @@ import org.apache.calcite.sql.parser.SqlParserTest;
 import org.apache.calcite.sql.parser.SqlParserUtil;
 import org.apache.calcite.sql.pretty.SqlPrettyWriter;
 import org.apache.calcite.sql.test.SqlTestFactory;
+import org.apache.calcite.sql.test.SqlTests;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.calcite.sql.validate.SqlConformanceEnum;
@@ -51,7 +52,6 @@ import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.test.MockSqlOperatorTable;
-import org.apache.calcite.test.SqlValidatorTestCase;
 import org.apache.calcite.test.catalog.MockCatalogReaderSimple;
 import org.apache.calcite.util.SourceStringReader;
 import org.apache.calcite.util.Util;
@@ -439,7 +439,7 @@ public class FlinkDDLDataTypeTest {
 		private void checkEx(String expectedMsgPattern,
 				SqlParserUtil.StringAndPos sap,
 				Throwable thrown) {
-			SqlValidatorTestCase.checkEx(thrown, expectedMsgPattern, sap);
+			SqlTests.checkEx(thrown, expectedMsgPattern, sap, SqlTests.Stage.VALIDATE);
 		}
 	}
 
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 3057324..0509e91 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -37,6 +37,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.Reader;
+import java.util.function.UnaryOperator;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -51,9 +52,10 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 		return FlinkSqlParserImpl.FACTORY;
 	}
 
-	protected SqlParser getSqlParser(Reader source) {
+	protected SqlParser getSqlParser(Reader source,
+			UnaryOperator<SqlParser.ConfigBuilder> transform) {
 		if (conformance0 == null) {
-			return super.getSqlParser(source);
+			return super.getSqlParser(source, transform);
 		} else {
 			// overwrite the default sql conformance.
 			return SqlParser.create(source,
@@ -75,12 +77,12 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
 	@Test
 	public void testShowCatalogs() {
-		check("show catalogs", "SHOW CATALOGS");
+		sql("show catalogs").ok("SHOW CATALOGS");
 	}
 
 	@Test
 	public void testDescribeCatalog() {
-		check("describe catalog a", "DESCRIBE CATALOG `A`");
+		sql("describe catalog a").ok("DESCRIBE CATALOG `A`");
 	}
 
 	/**
@@ -92,7 +94,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
 	@Test
 	public void testUseCatalog() {
-		check("use catalog a", "USE CATALOG `A`");
+		sql("use catalog a").ok("USE CATALOG `A`");
 	}
 
 	@Test
@@ -112,91 +114,94 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
 	@Test
 	public void testShowDataBases() {
-		check("show databases", "SHOW DATABASES");
+		sql("show databases").ok("SHOW DATABASES");
 	}
 
 	@Test
 	public void testUseDataBase() {
-		check("use default_db", "USE `DEFAULT_DB`");
-		check("use defaultCatalog.default_db", "USE `DEFAULTCATALOG`.`DEFAULT_DB`");
+		sql("use default_db").ok("USE `DEFAULT_DB`");
+		sql("use defaultCatalog.default_db").ok("USE `DEFAULTCATALOG`.`DEFAULT_DB`");
 	}
 
 	@Test
 	public void testCreateDatabase() {
-		check("create database db1", "CREATE DATABASE `DB1`");
-		check("create database if not exists db1", "CREATE DATABASE IF NOT EXISTS `DB1`");
-		check("create database catalog1.db1", "CREATE DATABASE `CATALOG1`.`DB1`");
-		check("create database db1 comment 'test create database'",
-			"CREATE DATABASE `DB1`\n" +
-			"COMMENT 'test create database'");
-		check("create database db1 comment 'test create database'" +
-			"with ( 'key1' = 'value1', 'key2.a' = 'value2.a')",
-			"CREATE DATABASE `DB1`\n" +
-			"COMMENT 'test create database' WITH (\n" +
-			"  'key1' = 'value1',\n" +
-			"  'key2.a' = 'value2.a'\n" +
-			")");
+		sql("create database db1").ok("CREATE DATABASE `DB1`");
+		sql("create database if not exists db1").ok("CREATE DATABASE IF NOT EXISTS `DB1`");
+		sql("create database catalog1.db1").ok("CREATE DATABASE `CATALOG1`.`DB1`");
+		final String sql = "create database db1 comment 'test create database'";
+		final String expected = "CREATE DATABASE `DB1`\n"
+				+ "COMMENT 'test create database'";
+		sql(sql).ok(expected);
+		final String sql1 = "create database db1 comment 'test create database'"
+				+ "with ( 'key1' = 'value1', 'key2.a' = 'value2.a')";
+		final String expected1 = "CREATE DATABASE `DB1`\n"
+				+ "COMMENT 'test create database' WITH (\n"
+				+ "  'key1' = 'value1',\n"
+				+ "  'key2.a' = 'value2.a'\n"
+				+ ")";
+		sql(sql1).ok(expected1);
 	}
 
 	@Test
 	public void testDropDatabase() {
-		check("drop database db1", "DROP DATABASE `DB1` RESTRICT");
-		check("drop database catalog1.db1", "DROP DATABASE `CATALOG1`.`DB1` RESTRICT");
-		check("drop database db1 RESTRICT", "DROP DATABASE `DB1` RESTRICT");
-		check("drop database db1 CASCADE", "DROP DATABASE `DB1` CASCADE");
+		sql("drop database db1").ok("DROP DATABASE `DB1` RESTRICT");
+		sql("drop database catalog1.db1").ok("DROP DATABASE `CATALOG1`.`DB1` RESTRICT");
+		sql("drop database db1 RESTRICT").ok("DROP DATABASE `DB1` RESTRICT");
+		sql("drop database db1 CASCADE").ok("DROP DATABASE `DB1` CASCADE");
 	}
 
 	@Test
 	public void testAlterDatabase() {
-		check("alter database db1 set ('key1' = 'value1','key2.a' = 'value2.a')",
-			"ALTER DATABASE `DB1` SET (\n" +
-			"  'key1' = 'value1',\n" +
-			"  'key2.a' = 'value2.a'\n" +
-			")");
+		final String sql = "alter database db1 set ('key1' = 'value1','key2.a' = 'value2.a')";
+		final String expected = "ALTER DATABASE `DB1` SET (\n"
+				+ "  'key1' = 'value1',\n"
+				+ "  'key2.a' = 'value2.a'\n"
+				+ ")";
+		sql(sql).ok(expected);
 	}
 
 	@Test
 	public void testDescribeDatabase() {
-		check("describe database db1", "DESCRIBE DATABASE `DB1`");
-		check("describe database catlog1.db1", "DESCRIBE DATABASE `CATLOG1`.`DB1`");
-		check("describe database extended db1", "DESCRIBE DATABASE EXTENDED `DB1`");
+		sql("describe database db1").ok("DESCRIBE DATABASE `DB1`");
+		sql("describe database catlog1.db1").ok("DESCRIBE DATABASE `CATLOG1`.`DB1`");
+		sql("describe database extended db1").ok("DESCRIBE DATABASE EXTENDED `DB1`");
 	}
 
 	@Test
 	public void testAlterFunction() {
-		check("alter function function1 as 'org.apache.fink.function.function1'",
-			"ALTER FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
+		sql("alter function function1 as 'org.apache.fink.function.function1'")
+				.ok("ALTER FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
 
-		check("alter temporary function function1 as 'org.apache.fink.function.function1'",
-			"ALTER TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
+		sql("alter temporary function function1 as 'org.apache.fink.function.function1'")
+				.ok("ALTER TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
 
-		check("alter temporary function function1 as 'org.apache.fink.function.function1' language scala",
-			"ALTER TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA");
+		sql("alter temporary function function1 as 'org.apache.fink.function.function1' language scala")
+				.ok("ALTER TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA");
 
-		check ("alter temporary system function function1 as 'org.apache.fink.function.function1'",
-			"ALTER TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
+		sql("alter temporary system function function1 as 'org.apache.fink.function.function1'")
+				.ok("ALTER TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
 
-		check("alter temporary system function function1 as 'org.apache.fink.function.function1' language java",
-			"ALTER TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA");
+		sql("alter temporary system function function1 as 'org.apache.fink.function.function1' language java")
+				.ok("ALTER TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA");
 	}
 
 	@Test
 	public void testShowFuntions() {
-		check("show functions", "SHOW FUNCTIONS");
-		check("show functions db1", "SHOW FUNCTIONS `DB1`");
-		check("show functions catalog1.db1", "SHOW FUNCTIONS `CATALOG1`.`DB1`");
+		sql("show functions").ok("SHOW FUNCTIONS");
+		sql("show functions db1").ok("SHOW FUNCTIONS `DB1`");
+		sql("show functions catalog1.db1").ok("SHOW FUNCTIONS `CATALOG1`.`DB1`");
 	}
 
 	@Test
 	public void testShowTables() {
-		check("show tables", "SHOW TABLES");
+		sql("show tables").ok("SHOW TABLES");
 	}
 
 	@Test
 	public void testDescribeTable() {
-		check("describe tbl", "DESCRIBE `TBL`");
-		check("describe catlog1.db1.tbl", "DESCRIBE `CATLOG1`.`DB1`.`TBL`");
-		check("describe extended db1", "DESCRIBE EXTENDED `DB1`");
+		sql("describe tbl").ok("DESCRIBE `TBL`");
+		sql("describe catlog1.db1.tbl").ok("DESCRIBE `CATLOG1`.`DB1`.`TBL`");
+		sql("describe extended db1").ok("DESCRIBE EXTENDED `DB1`");
 	}
 
 	/**
@@ -208,18 +213,19 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
 	@Test
 	public void testAlterTable() {
-		check("alter table t1 rename to t2", "ALTER TABLE `T1` RENAME TO `T2`");
-		check("alter table c1.d1.t1 rename to t2", "ALTER TABLE `C1`.`D1`.`T1` RENAME TO `T2`");
-		check("alter table t1 set ('key1'='value1')",
-			"ALTER TABLE `T1` SET (\n" +
-			"  'key1' = 'value1'\n" +
-			")");
+		sql("alter table t1 rename to t2").ok("ALTER TABLE `T1` RENAME TO `T2`");
+		sql("alter table c1.d1.t1 rename to t2").ok("ALTER TABLE `C1`.`D1`.`T1` RENAME TO `T2`");
+		final String sql = "alter table t1 set ('key1'='value1')";
+		final String expected = "ALTER TABLE `T1` SET (\n"
+				+ "  'key1' = 'value1'\n"
+				+ ")";
+		sql(sql).ok(expected);
 	}
 
 	@Test
 	public void testCreateTable() {
 		conformance0 = FlinkSqlConformance.HIVE;
-		check("CREATE TABLE tbl1 (\n" +
+		final String sql = "CREATE TABLE tbl1 (\n" +
 				"  a bigint,\n" +
 				"  h varchar, \n" +
 				"  g as 2 * (a + 1), \n" +
@@ -232,8 +238,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  with (\n" +
 				"    'connector' = 'kafka', \n" +
 				"    'kafka.topic' = 'log.test'\n" +
-				")\n",
-			"CREATE TABLE `TBL1` (\n" +
+				")\n";
+		final String expected = "CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT,\n" +
 				"  `H`  VARCHAR,\n" +
 				"  `G` AS (2 * (`A` + 1)),\n" +
@@ -246,13 +252,14 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"WITH (\n" +
 				"  'connector' = 'kafka',\n" +
 				"  'kafka.topic' = 'log.test'\n" +
-				")");
+				")";
+		sql(sql).ok(expected);
 	}
 
 	@Test
 	public void testCreateTableWithComment() {
 		conformance0 = FlinkSqlConformance.HIVE;
-		check("CREATE TABLE tbl1 (\n" +
+		final String sql = "CREATE TABLE tbl1 (\n" +
 				"  a bigint comment 'test column comment AAA.',\n" +
 				"  h varchar, \n" +
 				"  g as 2 * (a + 1), \n" +
@@ -266,8 +273,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  with (\n" +
 				"    'connector' = 'kafka', \n" +
 				"    'kafka.topic' = 'log.test'\n" +
-				")\n",
-			"CREATE TABLE `TBL1` (\n" +
+				")\n";
+		final String expected = "CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT  COMMENT 'test column comment AAA.',\n" +
 				"  `H`  VARCHAR,\n" +
 				"  `G` AS (2 * (`A` + 1)),\n" +
@@ -281,13 +288,14 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"WITH (\n" +
 				"  'connector' = 'kafka',\n" +
 				"  'kafka.topic' = 'log.test'\n" +
-				")");
+				")";
+		sql(sql).ok(expected);
 	}
 
 	@Test
 	public void testCreateTableWithPrimaryKeyAndUniqueKey() {
 		conformance0 = FlinkSqlConformance.HIVE;
-		check("CREATE TABLE tbl1 (\n" +
+		final String sql = "CREATE TABLE tbl1 (\n" +
 				"  a bigint comment 'test column comment AAA.',\n" +
 				"  h varchar, \n" +
 				"  g as 2 * (a + 1), \n" +
@@ -302,8 +310,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"  with (\n" +
 				"    'connector' = 'kafka', \n" +
 				"    'kafka.topic' = 'log.test'\n" +
-				")\n",
-			"CREATE TABLE `TBL1` (\n" +
+				")\n";
+		final String expected = "CREATE TABLE `TBL1` (\n" +
 				"  `A`  BIGINT  COMMENT 'test column comment AAA.',\n" +
 				"  `H`  VARCHAR,\n" +
 				"  `G` AS (2 * (`A` + 1)),\n" +
@@ -318,12 +326,13 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 				"WITH (\n" +
 				"  'connector' = 'kafka',\n" +
 				"  'kafka.topic' = 'log.test'\n" +
-				")");
+				")";
+		sql(sql).ok(expected);
 	}
 
 	@Test
 	public void testCreateTableWithWatermark() {
-		String sql = "CREATE TABLE tbl1 (\n" +
+		final String sql = "CREATE TABLE tbl1 (\n" +
 			"  ts timestamp(3),\n" +
 			"  id varchar, \n" +
 			"  watermark FOR ts AS ts - interval '3' second\n" +
@@ -332,20 +341,20 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"    'connector' = 'kafka', \n" +
 			"    'kafka.topic' = 'log.test'\n" +
 			")\n";
-		check(sql,
-			"CREATE TABLE `TBL1` (\n" +
+		final String expected = "CREATE TABLE `TBL1` (\n" +
 				"  `TS`  TIMESTAMP(3),\n" +
 				"  `ID`  VARCHAR,\n" +
 				"  WATERMARK FOR `TS` AS (`TS` - INTERVAL '3' SECOND)\n" +
 				") WITH (\n" +
 				"  'connector' = 'kafka',\n" +
 				"  'kafka.topic' = 'log.test'\n" +
-				")");
+				")";
+		sql(sql).ok(expected);
 	}
 
 	@Test
 	public void testCreateTableWithWatermarkOnComputedColumn() {
-		String sql = "CREATE TABLE tbl1 (\n" +
+		final String sql = "CREATE TABLE tbl1 (\n" +
 			"  log_ts varchar,\n" +
 			"  ts as to_timestamp(log_ts), \n" +
 			"  WATERMARK FOR ts AS ts + interval '1' second\n" +
@@ -354,34 +363,35 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"    'connector' = 'kafka', \n" +
 			"    'kafka.topic' = 'log.test'\n" +
 			")\n";
-		check(sql,
-			"CREATE TABLE `TBL1` (\n" +
+		final String expected = "CREATE TABLE `TBL1` (\n" +
 				"  `LOG_TS`  VARCHAR,\n" +
 				"  `TS` AS `TO_TIMESTAMP`(`LOG_TS`),\n" +
 				"  WATERMARK FOR `TS` AS (`TS` + INTERVAL '1' SECOND)\n" +
 				") WITH (\n" +
 				"  'connector' = 'kafka',\n" +
 				"  'kafka.topic' = 'log.test'\n" +
-				")");
+				")";
+		sql(sql).ok(expected);
 	}
 
 	@Test
 	public void testCreateTableWithWatermarkOnNestedField() {
-		check("CREATE TABLE tbl1 (\n" +
+		final String sql = "CREATE TABLE tbl1 (\n" +
 				"  f1 row<q1 bigint, q2 row<t1 timestamp, t2 varchar>, q3 boolean>,\n" +
 				"  WATERMARK FOR f1.q2.t1 AS NOW()\n" +
 				")\n" +
 				"  with (\n" +
 				"    'connector' = 'kafka', \n" +
 				"    'kafka.topic' = 'log.test'\n" +
-				")\n",
-			"CREATE TABLE `TBL1` (\n" +
+				")\n";
+		final String expected = "CREATE TABLE `TBL1` (\n" +
 				"  `F1`  ROW< `Q1` BIGINT, `Q2` ROW< `T1` TIMESTAMP, `T2` VARCHAR >, `Q3` BOOLEAN >,\n" +
 				"  WATERMARK FOR `F1`.`Q2`.`T1` AS `NOW`()\n" +
 				") WITH (\n" +
 				"  'connector' = 'kafka',\n" +
 				"  'kafka.topic' = 'log.test'\n" +
-				")");
+				")";
+		sql(sql).ok(expected);
 	}
 
 	@Test
@@ -434,48 +444,52 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
 	@Test
 	public void testCreateTableWithComplexType() {
-		check("CREATE TABLE tbl1 (\n" +
-			"  a ARRAY<bigint>, \n" +
-			"  b MAP<int, varchar>,\n" +
-			"  c ROW<cc0 int, cc1 float, cc2 varchar>,\n" +
-			"  d MULTISET<varchar>,\n" +
-			"  PRIMARY KEY (a, b) \n" +
-			") with (\n" +
-			"  'x' = 'y', \n" +
-			"  'asd' = 'data'\n" +
-			")\n", "CREATE TABLE `TBL1` (\n" +
-			"  `A`  ARRAY< BIGINT >,\n" +
-			"  `B`  MAP< INTEGER, VARCHAR >,\n" +
-			"  `C`  ROW< `CC0` INTEGER, `CC1` FLOAT, `CC2` VARCHAR >,\n" +
-			"  `D`  MULTISET< VARCHAR >,\n" +
-			"  PRIMARY KEY (`A`, `B`)\n" +
-			") WITH (\n" +
-			"  'x' = 'y',\n" +
-			"  'asd' = 'data'\n" +
-			")");
+		final String sql = "CREATE TABLE tbl1 (\n" +
+				"  a ARRAY<bigint>, \n" +
+				"  b MAP<int, varchar>,\n" +
+				"  c ROW<cc0 int, cc1 float, cc2 varchar>,\n" +
+				"  d MULTISET<varchar>,\n" +
+				"  PRIMARY KEY (a, b) \n" +
+				") with (\n" +
+				"  'x' = 'y', \n" +
+				"  'asd' = 'data'\n" +
+				")\n";
+		final String expected = "CREATE TABLE `TBL1` (\n" +
+				"  `A`  ARRAY< BIGINT >,\n" +
+				"  `B`  MAP< INTEGER, VARCHAR >,\n" +
+				"  `C`  ROW< `CC0` INTEGER, `CC1` FLOAT, `CC2` VARCHAR >,\n" +
+				"  `D`  MULTISET< VARCHAR >,\n" +
+				"  PRIMARY KEY (`A`, `B`)\n" +
+				") WITH (\n" +
+				"  'x' = 'y',\n" +
+				"  'asd' = 'data'\n" +
+				")";
+		sql(sql).ok(expected);
 	}
 
 	@Test
 	public void testCreateTableWithNestedComplexType() {
-		check("CREATE TABLE tbl1 (\n" +
-			"  a ARRAY<ARRAY<bigint>>, \n" +
-			"  b MAP<MAP<int, varchar>, ARRAY<varchar>>,\n" +
-			"  c ROW<cc0 ARRAY<int>, cc1 float, cc2 varchar>,\n" +
-			"  d MULTISET<ARRAY<int>>,\n" +
-			"  PRIMARY KEY (a, b) \n" +
-			") with (\n" +
-			"  'x' = 'y', \n" +
-			"  'asd' = 'data'\n" +
-			")\n", "CREATE TABLE `TBL1` (\n" +
-			"  `A`  ARRAY< ARRAY< BIGINT > >,\n" +
-			"  `B`  MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > >,\n" +
-			"  `C`  ROW< `CC0` ARRAY< INTEGER >, `CC1` FLOAT, `CC2` VARCHAR >,\n" +
-			"  `D`  MULTISET< ARRAY< INTEGER > >,\n" +
-			"  PRIMARY KEY (`A`, `B`)\n" +
-			") WITH (\n" +
-			"  'x' = 'y',\n" +
-			"  'asd' = 'data'\n" +
-			")");
+		final String sql = "CREATE TABLE tbl1 (\n" +
+				"  a ARRAY<ARRAY<bigint>>, \n" +
+				"  b MAP<MAP<int, varchar>, ARRAY<varchar>>,\n" +
+				"  c ROW<cc0 ARRAY<int>, cc1 float, cc2 varchar>,\n" +
+				"  d MULTISET<ARRAY<int>>,\n" +
+				"  PRIMARY KEY (a, b) \n" +
+				") with (\n" +
+				"  'x' = 'y', \n" +
+				"  'asd' = 'data'\n" +
+				")\n";
+		final String expected = "CREATE TABLE `TBL1` (\n" +
+				"  `A`  ARRAY< ARRAY< BIGINT > >,\n" +
+				"  `B`  MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > >,\n" +
+				"  `C`  ROW< `CC0` ARRAY< INTEGER >, `CC1` FLOAT, `CC2` VARCHAR >,\n" +
+				"  `D`  MULTISET< ARRAY< INTEGER > >,\n" +
+				"  PRIMARY KEY (`A`, `B`)\n" +
+				") WITH (\n" +
+				"  'x' = 'y',\n" +
+				"  'asd' = 'data'\n" +
+				")";
+		sql(sql).ok(expected);
 	}
 
 	@Test
@@ -494,7 +508,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"  'k1' = 'v1',\n" +
 			"  'k2' = 'v2'\n" +
 			")";
-		check(sql, expected);
+		sql(sql).ok(expected);
 	}
 
 	@Test
@@ -530,7 +544,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
 	@Test
 	public void testColumnSqlString() {
-		String sql = "CREATE TABLE sls_stream (\n" +
+		final String sql = "CREATE TABLE sls_stream (\n" +
 			"  a bigint, \n" +
 			"  f as a + 1, \n" +
 			"  b varchar,\n" +
@@ -542,7 +556,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"  'x' = 'y', \n" +
 			"  'asd' = 'data'\n" +
 			")\n";
-		String expected = "`A`, (`A` + 1) AS `F`, `B`, "
+		final String expected = "`A`, (`A` + 1) AS `F`, `B`, "
 			+ "`TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss') AS `TS`, "
 			+ "`PROCTIME`() AS `PROC`, `C`";
 		sql(sql).node(new ValidationMatcher()
@@ -552,7 +566,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 	@Test
 	public void testCreateInvalidPartitionedTable() {
 		conformance0 = FlinkSqlConformance.HIVE;
-		String sql = "create table sls_stream1(\n" +
+		final String sql = "create table sls_stream1(\n" +
 			"  a bigint,\n" +
 			"  b VARCHAR,\n" +
 			"  PRIMARY KEY(a, b)\n" +
@@ -567,7 +581,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 	@Test
 	public void testNotAllowedCreatePartition() {
 		conformance0 = FlinkSqlConformance.DEFAULT;
-		String sql = "create table sls_stream1(\n" +
+		final String sql = "create table sls_stream1(\n" +
 				"  a bigint,\n" +
 				"  b VARCHAR\n" +
 				") PARTITIONED BY (a^)^ with ( 'x' = 'y', 'asd' = 'dada')";
@@ -576,7 +590,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
 	@Test
 	public void testCreateTableWithMinusInOptionKey() {
-		String sql = "create table source_table(\n" +
+		final String sql = "create table source_table(\n" +
 			"  a int,\n" +
 			"  b bigint,\n" +
 			"  c string\n" +
@@ -586,7 +600,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"  'a.b-c-connector.e-f.g' = 'ada',\n" +
 			"  'a.b-c-d.e-1231.g' = 'ada',\n" +
 			"  'a.b-c-d.*' = 'adad')\n";
-		String expected = "CREATE TABLE `SOURCE_TABLE` (\n" +
+		final String expected = "CREATE TABLE `SOURCE_TABLE` (\n" +
 			"  `A`  INTEGER,\n" +
 			"  `B`  BIGINT,\n" +
 			"  `C`  STRING\n" +
@@ -597,12 +611,12 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"  'a.b-c-d.e-1231.g' = 'ada',\n" +
 			"  'a.b-c-d.*' = 'adad'\n" +
 			")";
-		check(sql, expected);
+		sql(sql).ok(expected);
 	}
 
 	@Test
 	public void testCreateTableWithOptionKeyAsIdentifier() {
-		String sql = "create table source_table(\n" +
+		final String sql = "create table source_table(\n" +
 			"  a int,\n" +
 			"  b bigint,\n" +
 			"  c string\n" +
@@ -614,14 +628,16 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
 	@Test
 	public void testDropTable() {
-		String sql = "DROP table catalog1.db1.tbl1";
-		check(sql, "DROP TABLE `CATALOG1`.`DB1`.`TBL1`");
+		final String sql = "DROP table catalog1.db1.tbl1";
+		final String expected = "DROP TABLE `CATALOG1`.`DB1`.`TBL1`";
+		sql(sql).ok(expected);
 	}
 
 	@Test
 	public void testDropIfExists() {
-		String sql = "DROP table IF EXISTS catalog1.db1.tbl1";
-		check(sql, "DROP TABLE IF EXISTS `CATALOG1`.`DB1`.`TBL1`");
+		final String sql = "DROP table IF EXISTS catalog1.db1.tbl1";
+		final String expected = "DROP TABLE IF EXISTS `CATALOG1`.`DB1`.`TBL1`";
+		sql(sql).ok(expected);
 	}
 
 	@Test
@@ -658,7 +674,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
 	@Test
 	public void testInsertExtendedColumnAsStaticPartition1() {
-		String expected = "INSERT INTO `EMPS` EXTEND (`Z` BOOLEAN) (`X`, `Y`)\n"
+		final String expected = "INSERT INTO `EMPS` EXTEND (`Z` BOOLEAN) (`X`, `Y`)\n"
 			+ "PARTITION (`Z` = 'ab')\n"
 			+ "(SELECT *\n"
 			+ "FROM `EMPS`)";
@@ -676,17 +692,19 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 	@Test
 	public void testInsertOverwrite() {
 		// non-partitioned
-		check("INSERT OVERWRITE myDB.myTbl SELECT * FROM src",
-			"INSERT OVERWRITE `MYDB`.`MYTBL`\n"
+		final String sql = "INSERT OVERWRITE myDB.myTbl SELECT * FROM src";
+		final String expected = "INSERT OVERWRITE `MYDB`.`MYTBL`\n"
 				+ "(SELECT *\n"
-				+ "FROM `SRC`)");
+				+ "FROM `SRC`)";
+		sql(sql).ok(expected);
 
 		// partitioned
-		check("INSERT OVERWRITE myTbl PARTITION (p1='v1',p2='v2') SELECT * FROM src",
-			"INSERT OVERWRITE `MYTBL`\n"
+		final String sql1 = "INSERT OVERWRITE myTbl PARTITION (p1='v1',p2='v2') SELECT * FROM src";
+		final String expected1 = "INSERT OVERWRITE `MYTBL`\n"
 				+ "PARTITION (`P1` = 'v1', `P2` = 'v2')\n"
 				+ "(SELECT *\n"
-				+ "FROM `SRC`)");
+				+ "FROM `SRC`)";
+		sql(sql1).ok(expected1);
 	}
 
 	@Test
@@ -702,7 +720,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"AS\n" +
 			"SELECT `COL1`\n" +
 			"FROM `TBL`";
-		check(sql, expected);
+		sql(sql).ok(expected);
 	}
 
 	@Test
@@ -713,7 +731,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"AS\n" +
 			"SELECT `COL1`\n" +
 			"FROM `TBL`";
-		check(sql, expected);
+		sql(sql).ok(expected);
 	}
 
 	@Test
@@ -723,7 +741,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			"AS\n" +
 			"SELECT `COL3`, `COL4`\n" +
 			"FROM `TBL`";
-		check(sql, expected);
+		sql(sql).ok(expected);
 	}
 
 	@Test
@@ -731,31 +749,41 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 		final String sql = "create view v(^*^) COMMENT 'this is a view' as select col1 from tbl";
 		final String expected = "(?s).*Encountered \"\\*\" at line 1, column 15.*";
 
-		checkFails(sql, expected);
+		sql(sql).fails(expected);
 	}
 
 	@Test
 	public void testDropView() {
 		final String sql = "DROP VIEW IF EXISTS view_name";
-		check(sql, "DROP VIEW IF EXISTS `VIEW_NAME`");
+		final String expected = "DROP VIEW IF EXISTS `VIEW_NAME`";
+		sql(sql).ok(expected);
 	}
 
 	// Override the test because our ROW field type default is nullable,
 	// which is different with Calcite.
 	@Override
 	public void testCastAsRowType() {
-		checkExp("cast(a as row(f0 int, f1 varchar))",
-			"CAST(`A` AS ROW(`F0` INTEGER, `F1` VARCHAR))");
-		checkExp("cast(a as row(f0 int not null, f1 varchar null))",
-			"CAST(`A` AS ROW(`F0` INTEGER NOT NULL, `F1` VARCHAR))");
-		checkExp("cast(a as row(f0 row(ff0 int not null, ff1 varchar null) null," +
-				" f1 timestamp not null))",
-			"CAST(`A` AS ROW(`F0` ROW(`FF0` INTEGER NOT NULL, `FF1` VARCHAR)," +
-				" `F1` TIMESTAMP NOT NULL))");
-		checkExp("cast(a as row(f0 bigint not null, f1 decimal null) array)",
-			"CAST(`A` AS ROW(`F0` BIGINT NOT NULL, `F1` DECIMAL) ARRAY)");
-		checkExp("cast(a as row(f0 varchar not null, f1 timestamp null) multiset)",
-			"CAST(`A` AS ROW(`F0` VARCHAR NOT NULL, `F1` TIMESTAMP) MULTISET)");
+		final String expr = "cast(a as row(f0 int, f1 varchar))";
+		final String expected = "CAST(`A` AS ROW(`F0` INTEGER, `F1` VARCHAR))";
+		expr(expr).ok(expected);
+
+		final String expr1 = "cast(a as row(f0 int not null, f1 varchar null))";
+		final String expected1 = "CAST(`A` AS ROW(`F0` INTEGER NOT NULL, `F1` VARCHAR))";
+		expr(expr1).ok(expected1);
+
+		final String expr2 = "cast(a as row(f0 row(ff0 int not null, ff1 varchar null) null,"
+				+ " f1 timestamp not null))";
+		final String expected2 = "CAST(`A` AS ROW(`F0` ROW(`FF0` INTEGER NOT NULL, `FF1` VARCHAR),"
+				+ " `F1` TIMESTAMP NOT NULL))";
+		expr(expr2).ok(expected2);
+
+		final String expr3 = "cast(a as row(f0 bigint not null, f1 decimal null) array)";
+		final String expected3 = "CAST(`A` AS ROW(`F0` BIGINT NOT NULL, `F1` DECIMAL) ARRAY)";
+		expr(expr3).ok(expected3);
+
+		final String expr4 = "cast(a as row(f0 varchar not null, f1 timestamp null) multiset)";
+		final String expected4 = "CAST(`A` AS ROW(`F0` VARCHAR NOT NULL, `F1` TIMESTAMP) MULTISET)";
+		expr(expr4).ok(expected4);
 	}
 
 	@Test
@@ -772,44 +800,50 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
 	@Test
 	public void testCreateFunction() {
-		check("create function catalog1.db1.function1 as 'org.apache.fink.function.function1'",
-			"CREATE FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
+		sql("create function catalog1.db1.function1 as 'org.apache.fink.function.function1'")
+				.ok("CREATE FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
 
-		check("create temporary function catalog1.db1.function1 as 'org.apache.fink.function.function1'",
-			"CREATE TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
+		sql("create temporary function catalog1.db1.function1 as 'org.apache.fink.function.function1'")
+				.ok("CREATE TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
 
-		check("create temporary system function catalog1.db1.function1 as 'org.apache.fink.function.function1'",
-			"CREATE TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
+		sql("create temporary system function catalog1.db1.function1 as 'org.apache.fink.function.function1'")
+				.ok("CREATE TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
 
-		check("create temporary function db1.function1 as 'org.apache.fink.function.function1'",
-			"CREATE TEMPORARY FUNCTION `DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
+		sql("create temporary function db1.function1 as 'org.apache.fink.function.function1'")
+				.ok("CREATE TEMPORARY FUNCTION `DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
 
-		check("create temporary function function1 as 'org.apache.fink.function.function1'",
-			"CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
+		sql("create temporary function function1 as 'org.apache.fink.function.function1'")
+				.ok("CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
 
-		check("create temporary function if not exists catalog1.db1.function1 as 'org.apache.fink.function.function1'",
-			"CREATE TEMPORARY FUNCTION IF NOT EXISTS `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
+		sql("create temporary function if not exists catalog1.db1.function1 as 'org.apache.fink.function.function1'")
+				.ok("CREATE TEMPORARY FUNCTION IF NOT EXISTS `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
 
-		check("create temporary function function1 as 'org.apache.fink.function.function1' language java",
-			"CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA");
+		sql("create temporary function function1 as 'org.apache.fink.function.function1' language java")
+				.ok("CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA");
 
-		check("create temporary system function  function1 as 'org.apache.fink.function.function1' language scala",
-			"CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA");
+		sql("create temporary system function  function1 as 'org.apache.fink.function.function1' language scala")
+				.ok("CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA");
 	}
 
 	@Test
 	public void testDropTemporaryFunction() {
-		check("drop temporary function catalog1.db1.function1",
-			"DROP TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`");
+		sql("drop temporary function catalog1.db1.function1")
+				.ok("DROP TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`");
 
-		check("drop temporary system function catalog1.db1.function1",
-			"DROP TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`");
+		sql("drop temporary system function catalog1.db1.function1")
+				.ok("DROP TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`");
 
-		check("drop temporary function if exists catalog1.db1.function1",
-			"DROP TEMPORARY FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`");
+		sql("drop temporary function if exists catalog1.db1.function1")
+				.ok("DROP TEMPORARY FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`");
 
-		check("drop temporary system function if exists catalog1.db1.function1",
-			"DROP TEMPORARY SYSTEM FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`");
+		sql("drop temporary system function if exists catalog1.db1.function1")
+				.ok("DROP TEMPORARY SYSTEM FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`");
+	}
+
+	@Override
+	public void testTableHintsInInsert() {
+		// Override the superclass tests because Flink insert parse block
+		// is totally customized, and the hints are not supported yet.
 	}
 
 	/** Matcher that invokes the #validate() of the {@link ExtendedSqlNode} instance. **/