You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/08/02 15:18:59 UTC

[flink] branch release-1.9 updated: [FLINK-13335][sql-parser] Bring the SQL CREATE TABLE DDL closer to FLIP-37

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new e29bf2a  [FLINK-13335][sql-parser] Bring the SQL CREATE TABLE DDL closer to FLIP-37
e29bf2a is described below

commit e29bf2ac6b4da32d4234b38f7445444a01471048
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Fri Jul 26 22:37:53 2019 +0800

    [FLINK-13335][sql-parser] Bring the SQL CREATE TABLE DDL closer to FLIP-37
    
    This brings the SQL DDL closer to FLIP-37. However, there are a couple of
    known limitations.
    
    Currently unsupported features:
    - INTERVAL
    - ROW with comments
    - ANY
    - NULL
    - NOT NULL/NULL for top-level types
    - ignoring collation/charset
    - VARCHAR without length (=VARCHAR(1))
    - TIMESTAMP WITH TIME ZONE
    - user-defined types
    - data types in non-DDL parts (e.g. CAST(f AS STRING))
    
    This closes #9243.
---
 docs/dev/table/sql.md                              |   2 +-
 .../src/main/codegen/data/Parser.tdd               |  27 +-
 .../src/main/codegen/includes/parserImpls.ftl      | 294 +++++++++++++-
 .../flink/sql/parser/FlinkSqlDataTypeSpec.java     | 325 +++++++++++++++
 .../type/{SqlMapType.java => SqlBytesType.java}    |  31 +-
 .../apache/flink/sql/parser/type/SqlMapType.java   |   6 +-
 .../type/{SqlMapType.java => SqlMultisetType.java} |  28 +-
 .../apache/flink/sql/parser/type/SqlRowType.java   |  34 +-
 .../type/{SqlMapType.java => SqlStringType.java}   |  31 +-
 .../apache/flink/sql/parser/type/SqlTimeType.java  |  73 ++++
 .../flink/sql/parser/type/SqlTimestampType.java    |  73 ++++
 .../java/org/apache/flink/sql/parser/Fixture.java  | 115 ++++++
 .../flink/sql/parser/FlinkDDLDataTypeTest.java     | 446 +++++++++++++++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  35 +-
 .../runtime/stream/sql/WindowAggregateITCase.scala |  10 +-
 .../runtime/stream/TimeAttributesITCase.scala      |   2 +-
 16 files changed, 1394 insertions(+), 138 deletions(-)

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 3155915..e607716 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1085,7 +1085,7 @@ Although not every SQL feature is implemented yet, some string combinations are
 
 {% highlight sql %}
 
-A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHA [...]
+A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, BYTES, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENG [...]
 
 {% endhighlight %}
 
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 77a8e58..5cefc93 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -31,10 +31,16 @@
     "org.apache.flink.sql.parser.dml.RichSqlInsert",
     "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword",
     "org.apache.flink.sql.parser.type.SqlArrayType",
+    "org.apache.flink.sql.parser.type.SqlBytesType",
     "org.apache.flink.sql.parser.type.SqlMapType",
+    "org.apache.flink.sql.parser.type.SqlMultisetType",
     "org.apache.flink.sql.parser.type.SqlRowType",
+    "org.apache.flink.sql.parser.type.SqlStringType",
+    "org.apache.flink.sql.parser.type.SqlTimeType",
+    "org.apache.flink.sql.parser.type.SqlTimestampType",
     "org.apache.flink.sql.parser.utils.SqlTimeUnit",
     "org.apache.flink.sql.parser.validate.FlinkSqlConformance",
+    "org.apache.flink.sql.parser.FlinkSqlDataTypeSpec",
     "org.apache.flink.sql.parser.SqlProperty",
     "org.apache.calcite.sql.SqlDrop",
     "org.apache.calcite.sql.SqlCreate",
@@ -53,7 +59,9 @@
     "FROM_SOURCE",
     "BOUNDED",
     "DELAY",
-    "OVERWRITE"
+    "OVERWRITE",
+    "STRING",
+    "BYTES"
   ]
 
   # List of keywords from "keywords" section that are not reserved.
@@ -384,13 +392,24 @@
   literalParserMethods: [
   ]
 
-  # List of methods for parsing custom data types.
+  # List of methods for parsing ddl supported data types.
   # Return type of method implementation should be "SqlIdentifier".
   # Example: SqlParseTimeStampZ().
-  dataTypeParserMethods: [
+  flinkDataTypeParserMethods: [
     "SqlArrayType()",
+    "SqlMultisetType()",
     "SqlMapType()",
-    "SqlRowType()"
+    "SqlRowType()",
+    "SqlStringType()",
+    "SqlBytesType()",
+    "SqlTimestampType()",
+    "SqlTimeType()"
+  ]
+
+  # List of methods for parsing custom data types.
+  # Return type of method implementation should be "SqlIdentifier".
+  # Example: SqlParseTimeStampZ().
+  dataTypeParserMethods: [
   ]
 
   # List of methods for parsing builtin function calls.
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 95621fe..ae66846 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -15,6 +15,20 @@
 // limitations under the License.
 -->
 
+/**
+* Parse a nullable option, default to be nullable.
+*/
+boolean NullableOpt() :
+{
+}
+{
+    <NULL> { return true; }
+|
+    <NOT> <NULL> { return false; }
+|
+    { return true; }
+}
+
 void TableColumn(TableCreationContext context) :
 {
 }
@@ -55,14 +69,8 @@ void TableColumn2(List<SqlNode> list) :
 }
 {
     name = SimpleIdentifier()
-    type = DataType()
-    (
-        <NULL> { type = type.withNullable(true); }
-    |
-        <NOT> <NULL> { type = type.withNullable(false); }
-    |
-        { type = type.withNullable(true); }
-    )
+    <#-- #FlinkDataType already takes care of the nullable attribute. -->
+    type = FlinkDataType()
     [ <COMMENT> <QUOTED_STRING> {
         String p = SqlParserUtil.parseString(token.image);
         comment = SqlLiteral.createCharString(p, getPos());
@@ -367,53 +375,295 @@ SqlDrop SqlDropView(Span s, boolean replace) :
     }
 }
 
+SqlIdentifier FlinkCollectionsTypeName() :
+{
+}
+{
+    LOOKAHEAD(2)
+    <MULTISET> {
+        return new SqlIdentifier(SqlTypeName.MULTISET.name(), getPos());
+    }
+|
+    <ARRAY> {
+        return new SqlIdentifier(SqlTypeName.ARRAY.name(), getPos());
+    }
+}
+
+SqlIdentifier FlinkTypeName() :
+{
+    final SqlTypeName sqlTypeName;
+    final SqlIdentifier typeName;
+    final Span s = Span.of();
+}
+{
+    (
+<#-- additional types are included here -->
+<#-- make custom data types in front of Calcite core data types -->
+<#list parser.flinkDataTypeParserMethods as method>
+    <#if (method?index > 0)>
+    |
+    </#if>
+        LOOKAHEAD(2)
+        typeName = ${method}
+</#list>
+    |
+        LOOKAHEAD(2)
+        sqlTypeName = SqlTypeName(s) {
+            typeName = new SqlIdentifier(sqlTypeName.name(), s.end(this));
+        }
+    |
+        LOOKAHEAD(2)
+        typeName = FlinkCollectionsTypeName()
+    |
+        typeName = CompoundIdentifier() {
+            throw new ParseException("UDT in DDL is not supported yet.");
+        }
+    )
+    {
+        return typeName;
+    }
+}
+
+/**
+* Parse a Flink data type with nullable options, NULL -> nullable, NOT NULL -> not nullable.
+* Default to be nullable.
+*/
+SqlDataTypeSpec FlinkDataType() :
+{
+    final SqlIdentifier typeName;
+    SqlIdentifier collectionTypeName = null;
+    int scale = -1;
+    int precision = -1;
+    String charSetName = null;
+    final Span s;
+    boolean nullable = true;
+    boolean elementNullable = true;
+}
+{
+    typeName = FlinkTypeName() {
+        s = span();
+    }
+    [
+        <LPAREN>
+        precision = UnsignedIntLiteral()
+        [
+            <COMMA>
+            scale = UnsignedIntLiteral()
+        ]
+        <RPAREN>
+    ]
+    elementNullable = NullableOpt()
+    [
+        collectionTypeName = FlinkCollectionsTypeName()
+        nullable = NullableOpt()
+    ]
+    {
+        if (null != collectionTypeName) {
+            return new FlinkSqlDataTypeSpec(
+                    collectionTypeName,
+                    typeName,
+                    precision,
+                    scale,
+                    charSetName,
+                    nullable,
+                    elementNullable,
+                    s.end(collectionTypeName));
+        }
+        nullable = elementNullable;
+        return new FlinkSqlDataTypeSpec(typeName,
+                precision,
+                scale,
+                charSetName,
+                null,
+                nullable,
+                elementNullable,
+                s.end(this));
+    }
+}
+
+SqlIdentifier SqlStringType() :
+{
+}
+{
+    <STRING> { return new SqlStringType(getPos()); }
+}
+
+SqlIdentifier SqlBytesType() :
+{
+}
+{
+    <BYTES> { return new SqlBytesType(getPos()); }
+}
+
+boolean WithLocalTimeZone() :
+{
+}
+{
+    <WITHOUT> <TIME> <ZONE> { return false; }
+|
+    <WITH>
+    (
+         <LOCAL> <TIME> <ZONE> { return true; }
+    |
+        <TIME> <ZONE> {
+            throw new ParseException("'WITH TIME ZONE' is not supported yet, options: " +
+                "'WITHOUT TIME ZONE', 'WITH LOCAL TIME ZONE'.");
+        }
+    )
+|
+    { return false; }
+}
+
+SqlIdentifier SqlTimeType() :
+{
+    int precision = -1;
+    boolean withLocalTimeZone = false;
+}
+{
+    <TIME>
+    (
+        <LPAREN> precision = UnsignedIntLiteral() <RPAREN>
+    |
+        { precision = -1; }
+    )
+    withLocalTimeZone = WithLocalTimeZone()
+    { return new SqlTimeType(getPos(), precision, withLocalTimeZone); }
+}
+
+SqlIdentifier SqlTimestampType() :
+{
+    int precision = -1;
+    boolean withLocalTimeZone = false;
+}
+{
+    <TIMESTAMP>
+    (
+        <LPAREN> precision = UnsignedIntLiteral() <RPAREN>
+    |
+        { precision = -1; }
+    )
+    withLocalTimeZone = WithLocalTimeZone()
+    { return new SqlTimestampType(getPos(), precision, withLocalTimeZone); }
+}
+
 SqlIdentifier SqlArrayType() :
 {
     SqlParserPos pos;
     SqlDataTypeSpec elementType;
+    boolean nullable = true;
 }
 {
     <ARRAY> { pos = getPos(); }
-    <LT> elementType = DataType()
+    <LT>
+    elementType = FlinkDataType()
     <GT>
     {
         return new SqlArrayType(pos, elementType);
     }
 }
 
+SqlIdentifier SqlMultisetType() :
+{
+    SqlParserPos pos;
+    SqlDataTypeSpec elementType;
+    boolean nullable = true;
+}
+{
+    <MULTISET> { pos = getPos(); }
+    <LT>
+    elementType = FlinkDataType()
+    <GT>
+    {
+        return new SqlMultisetType(pos, elementType);
+    }
+}
+
 SqlIdentifier SqlMapType() :
 {
     SqlDataTypeSpec keyType;
     SqlDataTypeSpec valType;
+    boolean nullable = true;
 }
 {
     <MAP>
-    <LT> keyType = DataType()
-    <COMMA> valType = DataType()
+    <LT>
+    keyType = FlinkDataType()
+    <COMMA>
+    valType = FlinkDataType()
     <GT>
     {
         return new SqlMapType(getPos(), keyType, valType);
     }
 }
 
+/**
+* Parse a "name1 type1 ['i'm a comment'], name2 type2 ..." list.
+*/
+void FieldNameTypeCommaList(
+        List<SqlIdentifier> fieldNames,
+        List<SqlDataTypeSpec> fieldTypes,
+        List<SqlCharStringLiteral> comments) :
+{
+    SqlIdentifier fName;
+    SqlDataTypeSpec fType;
+}
+{
+    [
+        fName = SimpleIdentifier()
+        fType = FlinkDataType()
+        {
+            fieldNames.add(fName);
+            fieldTypes.add(fType);
+        }
+        (
+            <QUOTED_STRING> {
+                String p = SqlParserUtil.parseString(token.image);
+                comments.add(SqlLiteral.createCharString(p, getPos()));
+            }
+        |
+            { comments.add(null); }
+        )
+    ]
+    (
+        <COMMA>
+        fName = SimpleIdentifier()
+        fType = FlinkDataType()
+        {
+            fieldNames.add(fName);
+            fieldTypes.add(fType);
+        }
+        (
+            <QUOTED_STRING> {
+                String p = SqlParserUtil.parseString(token.image);
+                comments.add(SqlLiteral.createCharString(p, getPos()));
+            }
+        |
+            { comments.add(null); }
+        )
+    )*
+}
+
+/**
+* Parse Row type, we support both Row(name1 type1, name2 type2) and Row<name1 type1, name2 type2>.
+* Every item type can have suffix of `NULL` or `NOT NULL` to indicate if this type is nullable.
+* i.e. Row(f0 int not null, f1 varchar null).
+*/
 SqlIdentifier SqlRowType() :
 {
-    SqlParserPos pos;
     List<SqlIdentifier> fieldNames = new ArrayList<SqlIdentifier>();
     List<SqlDataTypeSpec> fieldTypes = new ArrayList<SqlDataTypeSpec>();
+    List<SqlCharStringLiteral> comments = new ArrayList<SqlCharStringLiteral>();
 }
 {
-    <ROW> { pos = getPos(); SqlIdentifier fName; SqlDataTypeSpec fType;}
-    <LT>
-    fName = SimpleIdentifier() <COLON> fType = DataType()
-    { fieldNames.add(fName); fieldTypes.add(fType); }
+    <ROW>
     (
-        <COMMA>
-        fName = SimpleIdentifier() <COLON> fType = DataType()
-        { fieldNames.add(fName); fieldTypes.add(fType); }
-    )*
-    <GT>
+        <NE>
+    |
+        <LT> FieldNameTypeCommaList(fieldNames, fieldTypes, comments) <GT>
+    |
+        <LPAREN> FieldNameTypeCommaList(fieldNames, fieldTypes, comments) <RPAREN>
+    )
     {
-        return new SqlRowType(pos, fieldNames, fieldTypes);
+        return new SqlRowType(getPos(), fieldNames, fieldTypes, comments);
     }
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlDataTypeSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlDataTypeSpec.java
new file mode 100644
index 0000000..a3797d7
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlDataTypeSpec.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.type.ExtendedSqlType;
+import org.apache.flink.sql.parser.type.SqlArrayType;
+import org.apache.flink.sql.parser.type.SqlBytesType;
+import org.apache.flink.sql.parser.type.SqlMapType;
+import org.apache.flink.sql.parser.type.SqlMultisetType;
+import org.apache.flink.sql.parser.type.SqlRowType;
+import org.apache.flink.sql.parser.type.SqlStringType;
+import org.apache.flink.sql.parser.type.SqlTimeType;
+import org.apache.flink.sql.parser.type.SqlTimestampType;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCollation;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.util.Util;
+
+import java.nio.charset.Charset;
+import java.util.Objects;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a SQL data type specification in a parse tree.
+ *
+ * <p>A <code>SqlDataTypeSpec</code> is immutable; once created, you cannot
+ * change any of the fields.</p>
+ *
+ * <p>This class is an extension to {@link SqlDataTypeSpec}, we support
+ * complex type expressions like:</p>
+ *
+ * <blockquote><code>ROW(<br>
+ *   foo NUMBER(5, 2) NOT NULL,<br>
+ *   rec ROW(b BOOLEAN, i MyUDT NOT NULL))</code></blockquote>
+ *
+ * <p>Until <a href="https://issues.apache.org/jira/browse/CALCITE-3213">CALCITE-3213</a>
+ * is resolved, we can remove this class.
+ */
+public class FlinkSqlDataTypeSpec extends SqlDataTypeSpec {
+	// Flag saying if the element type is nullable if this type is a collection type.
+	// For collection type, we mean ARRAY and MULTISET type now.
+	private Boolean elementNullable;
+
+	public FlinkSqlDataTypeSpec(
+		SqlIdentifier collectionsTypeName,
+		SqlIdentifier typeName,
+		int precision,
+		int scale,
+		String charSetName,
+		Boolean nullable,
+		Boolean elementNullable,
+		SqlParserPos pos) {
+		super(collectionsTypeName, typeName, precision, scale,
+			charSetName, null, nullable, pos);
+		this.elementNullable = elementNullable;
+	}
+
+	public FlinkSqlDataTypeSpec(
+		SqlIdentifier collectionsTypeName,
+		SqlIdentifier typeName,
+		int precision,
+		int scale,
+		String charSetName,
+		TimeZone timeZone,
+		Boolean nullable,
+		Boolean elementNullable,
+		SqlParserPos pos) {
+		super(collectionsTypeName, typeName, precision, scale,
+			charSetName, timeZone, nullable, pos);
+		this.elementNullable = elementNullable;
+	}
+
+	public FlinkSqlDataTypeSpec(
+		SqlIdentifier typeName,
+		int precision,
+		int scale,
+		String charSetName,
+		TimeZone timeZone,
+		Boolean nullable,
+		Boolean elementNullable,
+		SqlParserPos pos) {
+		super(null, typeName, precision, scale,
+			charSetName, timeZone, nullable, pos);
+		this.elementNullable = elementNullable;
+	}
+
+	@Override
+	public SqlNode clone(SqlParserPos pos) {
+		return (getCollectionsTypeName() != null)
+			? new FlinkSqlDataTypeSpec(getCollectionsTypeName(), getTypeName(), getPrecision(),
+			getScale(), getCharSetName(), getNullable(), this.elementNullable, pos)
+			: new FlinkSqlDataTypeSpec(getTypeName(), getPrecision(), getScale(),
+			getCharSetName(), getTimeZone(), getNullable(), this.elementNullable, pos);
+	}
+
+	/** Returns a copy of this data type specification with a given
+	 * nullability. */
+	@Override
+	public SqlDataTypeSpec withNullable(Boolean nullable) {
+		if (Objects.equals(nullable, this.getNullable())) {
+			return this;
+		}
+		return new FlinkSqlDataTypeSpec(getCollectionsTypeName(), getTypeName(),
+			getPrecision(), getScale(), getCharSetName(), getTimeZone(), nullable,
+			this.elementNullable, getParserPosition());
+	}
+
+	@Override
+	public RelDataType deriveType(RelDataTypeFactory typeFactory) {
+		// Default to be nullable.
+		return this.deriveType(typeFactory, true);
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		final SqlIdentifier typeName = getTypeName();
+		String name = typeName.getSimple();
+		if (typeName instanceof ExtendedSqlType) {
+			typeName.unparse(writer, leftPrec, rightPrec);
+		} else if (SqlTypeName.get(name) != null) {
+			SqlTypeName sqlTypeName = SqlTypeName.get(name);
+			writer.keyword(name);
+			if (sqlTypeName.allowsPrec() && this.getPrecision() >= 0) {
+				SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
+				writer.print(this.getPrecision());
+				if (sqlTypeName.allowsScale() && this.getScale() >= 0) {
+					writer.sep(",", true);
+					writer.print(this.getScale());
+				}
+
+				writer.endList(frame);
+			}
+
+			if (this.getCharSetName() != null) {
+				writer.keyword("CHARACTER SET");
+				writer.identifier(this.getCharSetName(), false);
+			}
+
+			if (this.getCollectionsTypeName() != null) {
+				// Fix up nullable attribute if this is a collection type.
+				if (elementNullable != null && !elementNullable) {
+					writer.keyword("NOT NULL");
+				}
+				writer.keyword(this.getCollectionsTypeName().getSimple());
+			}
+		} else if (name.startsWith("_")) {
+			writer.keyword(name.substring(1));
+		} else {
+			this.getTypeName().unparse(writer, leftPrec, rightPrec);
+		}
+		if (getNullable() != null && !getNullable()) {
+			writer.keyword("NOT NULL");
+		}
+	}
+
+	@Override
+	public RelDataType deriveType(RelDataTypeFactory typeFactory, boolean nullable) {
+		final SqlIdentifier typeName = getTypeName();
+		if (!typeName.isSimple()) {
+			return null;
+		}
+		final String name = typeName.getSimple();
+		final SqlTypeName sqlTypeName = SqlTypeName.get(name);
+		// Try to get Flink custom data type first.
+		RelDataType type = getExtendedType(typeFactory, typeName);
+		if (type == null) {
+			if (sqlTypeName == null) {
+				return null;
+			} else {
+				// NOTE jvs 15-Jan-2009:  earlier validation is supposed to
+				// have caught these, which is why it's OK for them
+				// to be assertions rather than user-level exceptions.
+				final int precision = getPrecision();
+				final int scale = getScale();
+				if ((precision >= 0) && (scale >= 0)) {
+					assert sqlTypeName.allowsPrecScale(true, true);
+					type = typeFactory.createSqlType(sqlTypeName, precision, scale);
+				} else if (precision >= 0) {
+					assert sqlTypeName.allowsPrecNoScale();
+					type = typeFactory.createSqlType(sqlTypeName, precision);
+				} else {
+					assert sqlTypeName.allowsNoPrecNoScale();
+					type = typeFactory.createSqlType(sqlTypeName);
+				}
+			}
+		}
+
+		if (SqlTypeUtil.inCharFamily(type)) {
+			// Applying Syntax rule 10 from SQL:99 spec section 6.22 "If TD is a
+			// fixed-length, variable-length or large object character string,
+			// then the collating sequence of the result of the <cast
+			// specification> is the default collating sequence for the
+			// character repertoire of TD and the result of the <cast
+			// specification> has the Coercible coercibility characteristic."
+			SqlCollation collation = SqlCollation.COERCIBLE;
+
+			Charset charset;
+			final String charSetName = getCharSetName();
+			if (null == charSetName) {
+				charset = typeFactory.getDefaultCharset();
+			} else {
+				String javaCharSetName =
+					Objects.requireNonNull(
+						SqlUtil.translateCharacterSetName(charSetName), charSetName);
+				charset = Charset.forName(javaCharSetName);
+			}
+			type =
+				typeFactory.createTypeWithCharsetAndCollation(
+					type,
+					charset,
+					collation);
+		}
+
+		final SqlIdentifier collectionsTypeName = getCollectionsTypeName();
+		if (null != collectionsTypeName) {
+			// Fix the nullability of the element type first.
+			boolean elementNullable = true;
+			if (this.elementNullable != null) {
+				elementNullable = this.elementNullable;
+			}
+			type = typeFactory.createTypeWithNullability(type, elementNullable);
+
+			final String collectionName = collectionsTypeName.getSimple();
+			final SqlTypeName collectionsSqlTypeName =
+				Objects.requireNonNull(SqlTypeName.get(collectionName),
+					collectionName);
+
+			switch (collectionsSqlTypeName) {
+			case MULTISET:
+				type = typeFactory.createMultisetType(type, -1);
+				break;
+			case ARRAY:
+				type = typeFactory.createArrayType(type, -1);
+				break;
+			default:
+				throw Util.unexpected(collectionsSqlTypeName);
+			}
+		}
+
+		// Fix the nullability of this type.
+		if (this.getNullable() != null) {
+			nullable = this.getNullable();
+		}
+		type = typeFactory.createTypeWithNullability(type, nullable);
+
+		return type;
+	}
+
+	private RelDataType getExtendedType(RelDataTypeFactory typeFactory, SqlIdentifier typeName) {
+		// quick check.
+		if (!(typeName instanceof ExtendedSqlType)) {
+			return null;
+		}
+		if (typeName instanceof SqlBytesType) {
+			return typeFactory.createSqlType(SqlTypeName.VARBINARY, Integer.MAX_VALUE);
+		} else if (typeName instanceof SqlStringType) {
+			return typeFactory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE);
+		} else if (typeName instanceof SqlArrayType) {
+			final SqlArrayType arrayType = (SqlArrayType) typeName;
+			return typeFactory.createArrayType(arrayType.getElementType()
+				.deriveType(typeFactory), -1);
+		} else if (typeName instanceof SqlMultisetType) {
+			final SqlMultisetType multiSetType = (SqlMultisetType) typeName;
+			return typeFactory.createMultisetType(multiSetType.getElementType()
+				.deriveType(typeFactory), -1);
+		} else if (typeName instanceof SqlMapType) {
+			final SqlMapType mapType = (SqlMapType) typeName;
+			return typeFactory.createMapType(
+				mapType.getKeyType().deriveType(typeFactory),
+				mapType.getValType().deriveType(typeFactory));
+		} else if (typeName instanceof SqlRowType) {
+			final SqlRowType rowType = (SqlRowType) typeName;
+			return typeFactory.createStructType(
+				rowType.getFieldTypes().stream().map(ft -> ft.deriveType(typeFactory))
+					.collect(Collectors.toList()),
+				rowType.getFieldNames().stream().map(SqlIdentifier::getSimple)
+					.collect(Collectors.toList()));
+		} else if (typeName instanceof SqlTimeType) {
+			final SqlTimeType zonedTimeType = (SqlTimeType) typeName;
+			if (zonedTimeType.getPrecision() >= 0) {
+				return typeFactory.createSqlType(zonedTimeType.getSqlTypeName(),
+					zonedTimeType.getPrecision());
+			} else {
+				// Use default precision.
+				return typeFactory.createSqlType(zonedTimeType.getSqlTypeName());
+			}
+		} else if (typeName instanceof SqlTimestampType) {
+			final SqlTimestampType zonedTimestampType = (SqlTimestampType) typeName;
+			if (zonedTimestampType.getPrecision() >= 0) {
+				return typeFactory.createSqlType(zonedTimestampType.getSqlTypeName(),
+					zonedTimestampType.getPrecision());
+			} else {
+				// Use default precision.
+				return typeFactory.createSqlType(zonedTimestampType.getSqlTypeName());
+			}
+		}
+		return null;
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlBytesType.java
similarity index 55%
copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlBytesType.java
index 588c993..dfc2d1f 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlBytesType.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,40 +18,21 @@
 
 package org.apache.flink.sql.parser.type;
 
-import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
- * Parse column of Map type.
+ * Parse type "BYTES" which is a synonym of VARBINARY(INT_MAX).
  */
-public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
+public class SqlBytesType extends SqlIdentifier implements ExtendedSqlType {
 
-	private final SqlDataTypeSpec keyType;
-	private final SqlDataTypeSpec valType;
-
-	public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) {
-		super(SqlTypeName.MAP.getName(), pos);
-		this.keyType = keyType;
-		this.valType = valType;
-	}
-
-	public SqlDataTypeSpec getKeyType() {
-		return keyType;
-	}
-
-	public SqlDataTypeSpec getValType() {
-		return valType;
+	public SqlBytesType(SqlParserPos pos) {
+		super("BYTES", pos);
 	}
 
 	@Override
 	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-		writer.keyword("MAP<");
-		ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec);
-		writer.sep(",");
-		ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec);
-		writer.keyword(">");
+		writer.keyword("BYTES");
 	}
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
index 588c993..fd071c0 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
@@ -48,10 +48,12 @@ public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
 
 	@Override
 	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-		writer.keyword("MAP<");
+		writer.keyword("MAP");
+		SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
+		writer.sep(",");
 		ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec);
 		writer.sep(",");
 		ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec);
-		writer.keyword(">");
+		writer.endList(frame);
 	}
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMultisetType.java
similarity index 64%
copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMultisetType.java
index 588c993..3b6f19c 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMultisetType.java
@@ -25,33 +25,25 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
- * Parse column of Map type.
+ * Parse column of MULTISET type.
  */
-public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
+public class SqlMultisetType extends SqlIdentifier implements ExtendedSqlType {
 
-	private final SqlDataTypeSpec keyType;
-	private final SqlDataTypeSpec valType;
+	private final SqlDataTypeSpec elementType;
 
-	public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) {
-		super(SqlTypeName.MAP.getName(), pos);
-		this.keyType = keyType;
-		this.valType = valType;
+	public SqlMultisetType(SqlParserPos pos, SqlDataTypeSpec elementType) {
+		super(SqlTypeName.MULTISET.getName(), pos);
+		this.elementType = elementType;
 	}
 
-	public SqlDataTypeSpec getKeyType() {
-		return keyType;
-	}
-
-	public SqlDataTypeSpec getValType() {
-		return valType;
+	public SqlDataTypeSpec getElementType() {
+		return elementType;
 	}
 
 	@Override
 	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-		writer.keyword("MAP<");
-		ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec);
-		writer.sep(",");
-		ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec);
+		writer.keyword("MULTISET<");
+		ExtendedSqlType.unparseType(this.elementType, writer, leftPrec, rightPrec);
 		writer.keyword(">");
 	}
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java
index e77529e..886125c 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.sql.parser.type;
 
+import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlWriter;
@@ -34,13 +35,16 @@ public class SqlRowType extends SqlIdentifier implements ExtendedSqlType {
 
 	private final List<SqlIdentifier> fieldNames;
 	private final List<SqlDataTypeSpec> fieldTypes;
+	private final List<SqlCharStringLiteral> comments;
 
 	public SqlRowType(SqlParserPos pos,
 			List<SqlIdentifier> fieldNames,
-			List<SqlDataTypeSpec> fieldTypes) {
+			List<SqlDataTypeSpec> fieldTypes,
+			List<SqlCharStringLiteral> comments) {
 		super(SqlTypeName.ROW.getName(), pos);
 		this.fieldNames = fieldNames;
 		this.fieldTypes = fieldTypes;
+		this.comments = comments;
 	}
 
 	public List<SqlIdentifier> getFieldNames() {
@@ -51,6 +55,10 @@ public class SqlRowType extends SqlIdentifier implements ExtendedSqlType {
 		return fieldTypes;
 	}
 
+	public List<SqlCharStringLiteral> getComments() {
+		return comments;
+	}
+
 	public int getArity() {
 		return fieldNames.size();
 	}
@@ -65,14 +73,22 @@ public class SqlRowType extends SqlIdentifier implements ExtendedSqlType {
 
 	@Override
 	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-		writer.keyword("ROW");
-		SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
-		for (Pair<SqlIdentifier, SqlDataTypeSpec> p : Pair.zip(this.fieldNames, this.fieldTypes)) {
-			writer.sep(",", false);
-			p.left.unparse(writer, 0, 0);
-			writer.sep(":");
-			ExtendedSqlType.unparseType(p.right, writer, leftPrec, rightPrec);
+		writer.print("ROW");
+		if (getFieldNames().size() == 0) {
+			writer.print("<>");
+		} else {
+			SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
+			int i = 0;
+			for (Pair<SqlIdentifier, SqlDataTypeSpec> p : Pair.zip(this.fieldNames, this.fieldTypes)) {
+				writer.sep(",", false);
+				p.left.unparse(writer, 0, 0);
+				ExtendedSqlType.unparseType(p.right, writer, leftPrec, rightPrec);
+				if (comments.get(i) != null) {
+					comments.get(i).unparse(writer, leftPrec, rightPrec);
+				}
+				i += 1;
+			}
+			writer.endList(frame);
 		}
-		writer.endList(frame);
 	}
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlStringType.java
similarity index 55%
copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlStringType.java
index 588c993..a134b13 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlStringType.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,40 +18,21 @@
 
 package org.apache.flink.sql.parser.type;
 
-import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
- * Parse column of Map type.
+ * Parse type "STRING" which is a synonym of VARCHAR(INT_MAX).
  */
-public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
+public class SqlStringType extends SqlIdentifier implements ExtendedSqlType {
 
-	private final SqlDataTypeSpec keyType;
-	private final SqlDataTypeSpec valType;
-
-	public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) {
-		super(SqlTypeName.MAP.getName(), pos);
-		this.keyType = keyType;
-		this.valType = valType;
-	}
-
-	public SqlDataTypeSpec getKeyType() {
-		return keyType;
-	}
-
-	public SqlDataTypeSpec getValType() {
-		return valType;
+	public SqlStringType(SqlParserPos pos) {
+		super("STRING", pos);
 	}
 
 	@Override
 	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-		writer.keyword("MAP<");
-		ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec);
-		writer.sep(",");
-		ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec);
-		writer.keyword(">");
+		writer.keyword("STRING");
 	}
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimeType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimeType.java
new file mode 100644
index 0000000..23855ce
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimeType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.type;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Parse type "TIME WITHOUT TIME ZONE", "TIME(3) WITHOUT TIME ZONE", "TIME WITH LOCAL TIME ZONE",
+ * or "TIME(3) WITH LOCAL TIME ZONE".
+ */
+public class SqlTimeType extends SqlIdentifier implements ExtendedSqlType {
+	private final int precision;
+	private final boolean withLocalTimeZone;
+
+	public SqlTimeType(SqlParserPos pos, int precision, boolean withLocalTimeZone) {
+		super(getTypeName(withLocalTimeZone), pos);
+		this.precision = precision;
+		this.withLocalTimeZone = withLocalTimeZone;
+	}
+
+	private static String getTypeName(boolean withLocalTimeZone) {
+		if (withLocalTimeZone) {
+			return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE.name();
+		} else {
+			return SqlTypeName.TIME.name();
+		}
+	}
+
+	public SqlTypeName getSqlTypeName() {
+		if (withLocalTimeZone) {
+			return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE;
+		} else {
+			return SqlTypeName.TIME;
+		}
+	}
+
+	public int getPrecision() {
+		return this.precision;
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		writer.keyword(SqlTypeName.TIME.name());
+		if (this.precision >= 0) {
+			final SqlWriter.Frame frame =
+				writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
+			writer.print(precision);
+			writer.endList(frame);
+		}
+		if (this.withLocalTimeZone) {
+			writer.keyword("WITH LOCAL TIME ZONE");
+		}
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimestampType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimestampType.java
new file mode 100644
index 0000000..09e2d08
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimestampType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.type;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Parse type "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP(3) WITHOUT TIME ZONE",
+ * "TIMESTAMP WITH LOCAL TIME ZONE", or "TIMESTAMP(3) WITH LOCAL TIME ZONE".
+ */
+public class SqlTimestampType extends SqlIdentifier implements ExtendedSqlType {
+	private final int precision;
+	private final boolean withLocalTimeZone;
+
+	public SqlTimestampType(SqlParserPos pos, int precision, boolean withLocalTimeZone) {
+		super(getTypeName(withLocalTimeZone), pos);
+		this.precision = precision;
+		this.withLocalTimeZone = withLocalTimeZone;
+	}
+
+	private static String getTypeName(boolean withLocalTimeZone) {
+		if (withLocalTimeZone) {
+			return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.name();
+		} else {
+			return SqlTypeName.TIMESTAMP.name();
+		}
+	}
+
+	public SqlTypeName getSqlTypeName() {
+		if (withLocalTimeZone) {
+			return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+		} else {
+			return SqlTypeName.TIMESTAMP;
+		}
+	}
+
+	public int getPrecision() {
+		return this.precision;
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		writer.keyword(SqlTypeName.TIMESTAMP.name());
+		if (this.precision >= 0) {
+			final SqlWriter.Frame frame =
+				writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
+			writer.print(precision);
+			writer.endList(frame);
+		}
+		if (this.withLocalTimeZone) {
+			writer.keyword("WITH LOCAL TIME ZONE");
+		}
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java
new file mode 100644
index 0000000..671958e
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.List;
+
+/**
+ * Type used during tests.
+ */
+public class Fixture {
+	private final RelDataTypeFactory typeFactory;
+
+	final RelDataType char1Type;
+	final RelDataType char33Type;
+	final RelDataType varcharType;
+	final RelDataType varchar33Type;
+	final RelDataType booleanType;
+	final RelDataType binaryType;
+	final RelDataType binary33Type;
+	final RelDataType varbinaryType;
+	final RelDataType varbinary33Type;
+	final RelDataType decimalType;
+	final RelDataType decimalP10S0Type;
+	final RelDataType decimalP10S3Type;
+	final RelDataType tinyintType;
+	final RelDataType smallintType;
+	final RelDataType intType;
+	final RelDataType bigintType;
+	final RelDataType floatType;
+	final RelDataType doubleType;
+	final RelDataType dateType;
+	final RelDataType timeType;
+	final RelDataType time3Type;
+	final RelDataType timestampType;
+	final RelDataType timestamp3Type;
+	final RelDataType timestampWithLocalTimeZoneType;
+	final RelDataType timestamp3WithLocalTimeZoneType;
+	final RelDataType nullType;
+
+	Fixture(RelDataTypeFactory typeFactory) {
+		this.typeFactory = typeFactory;
+		this.char1Type = typeFactory.createSqlType(SqlTypeName.CHAR);
+		this.char33Type = typeFactory.createSqlType(SqlTypeName.CHAR, 33);
+		this.varcharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+		this.varchar33Type = typeFactory.createSqlType(SqlTypeName.VARCHAR, 33);
+		this.booleanType = typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+		this.binaryType = typeFactory.createSqlType(SqlTypeName.BINARY);
+		this.binary33Type = typeFactory.createSqlType(SqlTypeName.BINARY, 33);
+		this.varbinaryType = typeFactory.createSqlType(SqlTypeName.VARBINARY);
+		this.varbinary33Type = typeFactory.createSqlType(SqlTypeName.VARBINARY, 33);
+		this.decimalType = typeFactory.createSqlType(SqlTypeName.DECIMAL);
+		this.decimalP10S0Type = typeFactory.createSqlType(SqlTypeName.DECIMAL, 10);
+		this.decimalP10S3Type = typeFactory.createSqlType(SqlTypeName.DECIMAL, 10, 3);
+		this.tinyintType = typeFactory.createSqlType(SqlTypeName.TINYINT);
+		this.smallintType = typeFactory.createSqlType(SqlTypeName.SMALLINT);
+		this.intType = typeFactory.createSqlType(SqlTypeName.INTEGER);
+		this.bigintType = typeFactory.createSqlType(SqlTypeName.BIGINT);
+		this.floatType = typeFactory.createSqlType(SqlTypeName.FLOAT);
+		this.doubleType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
+		this.dateType = typeFactory.createSqlType(SqlTypeName.DATE);
+		this.timeType = typeFactory.createSqlType(SqlTypeName.TIME);
+		this.time3Type = typeFactory.createSqlType(SqlTypeName.TIME, 3);
+		this.timestampType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+		this.timestamp3Type = typeFactory.createSqlType(SqlTypeName.TIMESTAMP, 3);
+		this.timestampWithLocalTimeZoneType =
+			typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+		this.timestamp3WithLocalTimeZoneType =
+			typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3);
+		this.nullType = typeFactory.createSqlType(SqlTypeName.NULL);
+	}
+
+	public RelDataType createSqlType(SqlTypeName sqlTypeName, int precision) {
+		return typeFactory.createSqlType(sqlTypeName, precision);
+	}
+
+	public RelDataType createArrayType(RelDataType elementType) {
+		return typeFactory.createArrayType(elementType, -1);
+	}
+
+	public RelDataType createMultisetType(RelDataType elementType) {
+		return typeFactory.createMultisetType(elementType, -1);
+	}
+
+	public RelDataType createMapType(RelDataType keyType, RelDataType valType) {
+		return typeFactory.createMapType(keyType, valType);
+	}
+
+	public RelDataType createStructType(List<RelDataType> keyTypes, List<String> names) {
+		return typeFactory.createStructType(keyTypes, names);
+	}
+
+	public RelDataType nullable(RelDataType type) {
+		return typeFactory.createTypeWithNullability(type, true);
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
new file mode 100644
index 0000000..4b4499f
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.dialect.CalciteSqlDialect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.SqlParserTest;
+import org.apache.calcite.sql.parser.SqlParserUtil;
+import org.apache.calcite.sql.pretty.SqlPrettyWriter;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.test.SqlValidatorTestCase;
+import org.apache.calcite.util.Util;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for all the sup[ported flink DDL data types.
+ */
+@RunWith(Parameterized.class)
+public class FlinkDDLDataTypeTest {
+	private FlinkSqlConformance conformance = FlinkSqlConformance.DEFAULT;
+	private static final RelDataTypeFactory TYPE_FACTORY =
+		new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+	private static final Fixture FIXTURE = new Fixture(TYPE_FACTORY);
+	private static final String DDL_FORMAT = "create table t1 (\n" +
+		"  f0 %s\n" +
+		") with (\n" +
+		"  k1 = 'v1'\n" +
+		")";
+
+	@Parameterized.Parameters(name = "{index}: {0}")
+	public static List<TestItem> testData() {
+		return Arrays.asList(
+			createTestItem("CHAR", nullable(FIXTURE.char1Type), "CHAR"),
+			createTestItem("CHAR NOT NULL", FIXTURE.char1Type, "CHAR NOT NULL"),
+			createTestItem("CHAR   NOT \t\nNULL", FIXTURE.char1Type, "CHAR NOT NULL"),
+			createTestItem("char not null", FIXTURE.char1Type, "CHAR NOT NULL"),
+			createTestItem("CHAR NULL", nullable(FIXTURE.char1Type), "CHAR"),
+			createTestItem("CHAR(33)", nullable(FIXTURE.char33Type), "CHAR(33)"),
+			createTestItem("VARCHAR", nullable(FIXTURE.varcharType), "VARCHAR"),
+			createTestItem("VARCHAR(33)", nullable(FIXTURE.varchar33Type), "VARCHAR(33)"),
+			createTestItem("STRING",
+				nullable(FIXTURE.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE)), "STRING"),
+			createTestItem("BOOLEAN", nullable(FIXTURE.booleanType), "BOOLEAN"),
+			createTestItem("BINARY", nullable(FIXTURE.binaryType), "BINARY"),
+			createTestItem("BINARY(33)", nullable(FIXTURE.binary33Type), "BINARY(33)"),
+			createTestItem("VARBINARY", nullable(FIXTURE.varbinaryType), "VARBINARY"),
+			createTestItem("VARBINARY(33)", nullable(FIXTURE.varbinary33Type),
+				"VARBINARY(33)"),
+			createTestItem("BYTES",
+				nullable(FIXTURE.createSqlType(SqlTypeName.VARBINARY, Integer.MAX_VALUE)),
+				"BYTES"),
+			createTestItem("DECIMAL", nullable(FIXTURE.decimalType), "DECIMAL"),
+			createTestItem("DEC", nullable(FIXTURE.decimalType), "DECIMAL"),
+			createTestItem("NUMERIC", nullable(FIXTURE.decimalType), "DECIMAL"),
+			createTestItem("DECIMAL(10)", nullable(FIXTURE.decimalP10S0Type), "DECIMAL(10)"),
+			createTestItem("DEC(10)", nullable(FIXTURE.decimalP10S0Type), "DECIMAL(10)"),
+			createTestItem("NUMERIC(10)", nullable(FIXTURE.decimalP10S0Type), "DECIMAL(10)"),
+			createTestItem("DECIMAL(10, 3)", nullable(FIXTURE.decimalP10S3Type),
+				"DECIMAL(10, 3)"),
+			createTestItem("DEC(10, 3)", nullable(FIXTURE.decimalP10S3Type),
+				"DECIMAL(10, 3)"),
+			createTestItem("NUMERIC(10, 3)", nullable(FIXTURE.decimalP10S3Type),
+				"DECIMAL(10, 3)"),
+			createTestItem("TINYINT", nullable(FIXTURE.tinyintType), "TINYINT"),
+			createTestItem("SMALLINT", nullable(FIXTURE.smallintType), "SMALLINT"),
+			createTestItem("INTEGER", nullable(FIXTURE.intType), "INTEGER"),
+			createTestItem("INT", nullable(FIXTURE.intType), "INTEGER"),
+			createTestItem("BIGINT", nullable(FIXTURE.bigintType), "BIGINT"),
+			createTestItem("FLOAT", nullable(FIXTURE.floatType), "FLOAT"),
+			createTestItem("DOUBLE", nullable(FIXTURE.doubleType), "DOUBLE"),
+			createTestItem("DOUBLE PRECISION", nullable(FIXTURE.doubleType), "DOUBLE"),
+			createTestItem("DATE", nullable(FIXTURE.dateType), "DATE"),
+			createTestItem("TIME", nullable(FIXTURE.timeType), "TIME"),
+			createTestItem("TIME WITHOUT TIME ZONE", nullable(FIXTURE.timeType), "TIME"),
+			createTestItem("TIME(3)", nullable(FIXTURE.time3Type), "TIME(3)"),
+			createTestItem("TIME(3) WITHOUT TIME ZONE", nullable(FIXTURE.time3Type),
+				"TIME(3)"),
+			createTestItem("TIMESTAMP", nullable(FIXTURE.timestampType), "TIMESTAMP"),
+			createTestItem("TIMESTAMP WITHOUT TIME ZONE", nullable(FIXTURE.timestampType),
+				"TIMESTAMP"),
+			createTestItem("TIMESTAMP(3)", nullable(FIXTURE.timestamp3Type), "TIMESTAMP(3)"),
+			createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE",
+				nullable(FIXTURE.timestamp3Type), "TIMESTAMP(3)"),
+			createTestItem("TIMESTAMP WITH LOCAL TIME ZONE",
+				nullable(FIXTURE.timestampWithLocalTimeZoneType),
+				"TIMESTAMP WITH LOCAL TIME ZONE"),
+			createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE",
+				nullable(FIXTURE.timestamp3WithLocalTimeZoneType),
+				"TIMESTAMP(3) WITH LOCAL TIME ZONE"),
+			createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>",
+				nullable(FIXTURE.createArrayType(nullable(FIXTURE.timestamp3WithLocalTimeZoneType))),
+				"ARRAY< TIMESTAMP(3) WITH LOCAL TIME ZONE >"),
+			createTestItem("ARRAY<INT NOT NULL>",
+				nullable(FIXTURE.createArrayType(FIXTURE.intType)),
+				"ARRAY< INTEGER NOT NULL >"),
+			createTestItem("INT ARRAY",
+				nullable(FIXTURE.createArrayType(nullable(FIXTURE.intType))),
+				"INTEGER ARRAY"),
+			createTestItem("INT NOT NULL ARRAY",
+				nullable(FIXTURE.createArrayType(FIXTURE.intType)),
+				"INTEGER NOT NULL ARRAY"),
+			createTestItem("INT ARRAY NOT NULL",
+				FIXTURE.createArrayType(nullable(FIXTURE.intType)),
+				"INTEGER ARRAY NOT NULL"),
+			createTestItem("MULTISET<INT NOT NULL>",
+				nullable(FIXTURE.createMultisetType(FIXTURE.intType)),
+				"MULTISET< INTEGER NOT NULL >"),
+			createTestItem("INT MULTISET",
+				nullable(FIXTURE.createMultisetType(nullable(FIXTURE.intType))),
+				"INTEGER MULTISET"),
+			createTestItem("INT NOT NULL MULTISET",
+				nullable(FIXTURE.createMultisetType(FIXTURE.intType)),
+				"INTEGER NOT NULL MULTISET"),
+			createTestItem("INT MULTISET NOT NULL",
+				FIXTURE.createMultisetType(nullable(FIXTURE.intType)),
+				"INTEGER MULTISET NOT NULL"),
+			createTestItem("MAP<BIGINT, BOOLEAN>",
+				nullable(FIXTURE.createMapType(
+					nullable(FIXTURE.bigintType),
+					nullable(FIXTURE.booleanType))),
+				"MAP< BIGINT, BOOLEAN >"),
+			createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>",
+				nullable(FIXTURE.createStructType(
+					Arrays.asList(FIXTURE.intType, nullable(FIXTURE.booleanType)),
+					Arrays.asList("f0", "f1"))),
+				"ROW< `f0` INTEGER NOT NULL, `f1` BOOLEAN >"),
+			createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)",
+				nullable(FIXTURE.createStructType(
+					Arrays.asList(FIXTURE.intType, nullable(FIXTURE.booleanType)),
+					Arrays.asList("f0", "f1"))),
+				"ROW< `f0` INTEGER NOT NULL, `f1` BOOLEAN >"),
+			createTestItem("ROW<`f0` INT>",
+				nullable(FIXTURE.createStructType(
+					Collections.singletonList(nullable(FIXTURE.intType)),
+					Collections.singletonList("f0"))),
+				"ROW< `f0` INTEGER >"),
+			createTestItem("ROW(`f0` INT)",
+				nullable(FIXTURE.createStructType(
+					Collections.singletonList(nullable(FIXTURE.intType)),
+					Collections.singletonList("f0"))),
+				"ROW< `f0` INTEGER >"),
+			createTestItem("ROW<>",
+				nullable(FIXTURE.createStructType(
+					Collections.emptyList(),
+					Collections.emptyList())),
+				"ROW<>"),
+			createTestItem("ROW()",
+				nullable(FIXTURE.createStructType(
+					Collections.emptyList(),
+					Collections.emptyList())),
+				"ROW<>"),
+			createTestItem("ROW<f0 INT NOT NULL 'This is a comment.', "
+					+ "f1 BOOLEAN 'This as well.'>",
+				nullable(FIXTURE.createStructType(
+					Arrays.asList(FIXTURE.intType, nullable(FIXTURE.booleanType)),
+					Arrays.asList("f0", "f1"))),
+				"ROW< `f0` INTEGER NOT NULL 'This is a comment.', "
+					+ "`f1` BOOLEAN 'This as well.' >"),
+
+			// test parse throws error.
+			createTestItem("TIMESTAMP WITH TIME ZONE",
+				"'WITH TIME ZONE' is not supported yet, options: "
+					+ "'WITHOUT TIME ZONE', 'WITH LOCAL TIME ZONE'."),
+			createTestItem("TIMESTAMP(3) WITH TIME ZONE",
+				"'WITH TIME ZONE' is not supported yet, options: "
+					+ "'WITHOUT TIME ZONE', 'WITH LOCAL TIME ZONE'."),
+			createTestItem("^NULL^",
+				"(?s).*Encountered \"NULL\" at line 2, column 6..*"),
+			createTestItem("cat.db.MyType",
+				"(?s).*UDT in DDL is not supported yet..*"),
+			createTestItem("`db`.`MyType`",
+				"(?s).*UDT in DDL is not supported yet..*"),
+			createTestItem("MyType",
+				"(?s).*UDT in DDL is not supported yet..*"),
+			createTestItem("ARRAY<MyType>",
+				"(?s).*UDT in DDL is not supported yet..*"),
+			createTestItem("ROW<f0 MyType, f1 `c`.`d`.`t`>",
+				"(?s).*UDT in DDL is not supported yet..*"),
+			createTestItem("^INTERVAL^ YEAR",
+				"(?s).*Encountered \"INTERVAL\" at line 2, column 6..*"),
+			createTestItem("ANY(^'unknown.class'^, '')",
+				"(?s).*Encountered \"\\\\'unknown.class\\\\'\" at line 2, column 10.\n.*"
+					+ "Was expecting:\n"
+					+ "    <UNSIGNED_INTEGER_LITERAL> ...\n"
+					+ ".*"));
+	}
+
+	private static TestItem createTestItem(Object... args) {
+		assert args.length >= 2;
+		final String testExpr = (String) args[0];
+		TestItem testItem = TestItem.fromTestExpr(testExpr);
+		if (args[1] instanceof String) {
+			testItem.withExpectedError((String) args[1]);
+		} else if (args[1] instanceof RelDataType) {
+			testItem.withExpectedType((RelDataType) args[1]);
+		}
+		if (args.length == 3) {
+			testItem.withExpectedUnparsed((String) args[2]);
+		}
+		return testItem;
+	}
+
+	@Parameterized.Parameter
+	public TestItem testItem;
+
+	@Test
+	public void testDataTypeParsing() {
+		if (testItem.expectedType != null) {
+			checkType(testItem.testExpr, testItem.expectedType);
+		}
+	}
+
+	@Test
+	public void testThrowsError() {
+		if (testItem.expectedError != null) {
+			checkFails(testItem.testExpr, testItem.expectedError);
+		}
+	}
+
+	@Test
+	public void testDataTypeUnparsing() {
+		if (testItem.expectedUnparsed != null) {
+			checkUnparseTo(testItem.testExpr, testItem.expectedUnparsed);
+		}
+	}
+
+	private static RelDataType nullable(RelDataType type) {
+		return FIXTURE.nullable(type);
+	}
+
+	private void checkType(String typeExpr, RelDataType expectedType) {
+		this.sql(String.format(DDL_FORMAT, typeExpr)).checkType(expectedType);
+	}
+
+	private void checkFails(String typeExpr, String expectedMsgPattern) {
+		sql(String.format(DDL_FORMAT, typeExpr)).fails(expectedMsgPattern);
+	}
+
+	private void checkUnparseTo(String typeExpr, String expectedUnparsed) {
+		sql(String.format(DDL_FORMAT, typeExpr)).unparsedTo(expectedUnparsed);
+	}
+
+	private Tester getTester() {
+		return new TesterImpl();
+	}
+
+	private Sql sql(String sql) {
+		return new Sql(sql);
+	}
+
+	//~ Inner Classes ----------------------------------------------------------
+
+	private static class TestItem {
+		private final String testExpr;
+		@Nullable
+		private RelDataType expectedType;
+		@Nullable
+		private String expectedError;
+		@Nullable
+		private String expectedUnparsed;
+
+		private TestItem(String testExpr) {
+			this.testExpr = testExpr;
+		}
+
+		static TestItem fromTestExpr(String testExpr) {
+			return new TestItem(testExpr);
+		}
+
+		TestItem withExpectedType(RelDataType expectedType) {
+			this.expectedType = expectedType;
+			return this;
+		}
+
+		TestItem withExpectedError(String expectedError) {
+			this.expectedError = expectedError;
+			return this;
+		}
+
+		TestItem withExpectedUnparsed(String expectedUnparsed) {
+			this.expectedUnparsed = expectedUnparsed;
+			return this;
+		}
+
+		@Override
+		public String toString() {
+			return this.testExpr;
+		}
+	}
+
+	private class Sql {
+		private final String sql;
+
+		Sql(String sql) {
+			this.sql = sql;
+		}
+
+		public Sql checkType(RelDataType type) {
+			getTester().checkType(this.sql, type);
+			return this;
+		}
+
+		public Sql fails(String expectedMsgPattern) {
+			getTester().checkFails(this.sql, expectedMsgPattern);
+			return this;
+		}
+
+		public Sql unparsedTo(String expectedUnparsed) {
+			getTester().checkUnparsed(this.sql, expectedUnparsed);
+			return this;
+		}
+	}
+
+	/**
+	 * Callback to control how test actions are performed.
+	 */
+	protected interface Tester {
+		void checkType(String sql, RelDataType type);
+
+		void checkFails(String sql, String expectedMsgPattern);
+
+		void checkUnparsed(String sql, String expectedUnparsed);
+	}
+
+	/**
+	 * Default implementation of {@link SqlParserTest.Tester}.
+	 */
+	protected class TesterImpl implements Tester {
+		private SqlParser getSqlParser(String sql) {
+			return SqlParser.create(sql,
+				SqlParser.configBuilder()
+					.setParserFactory(FlinkSqlParserImpl.FACTORY)
+					.setQuoting(Quoting.BACK_TICK)
+					.setUnquotedCasing(Casing.UNCHANGED)
+					.setQuotedCasing(Casing.UNCHANGED)
+					.setConformance(conformance)
+					.build());
+		}
+
+		private SqlDialect getSqlDialect() {
+			return new CalciteSqlDialect(SqlDialect.EMPTY_CONTEXT
+				.withQuotedCasing(Casing.UNCHANGED)
+				.withConformance(conformance)
+				.withUnquotedCasing(Casing.UNCHANGED)
+				.withIdentifierQuoteString("`"));
+		}
+
+		public void checkType(String sql, RelDataType type) {
+			final SqlNode sqlNode = parseStmtAndHandleEx(sql);
+			assert sqlNode instanceof SqlCreateTable;
+			final SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode;
+			SqlNodeList columns = sqlCreateTable.getColumnList();
+			assert columns.size() == 1;
+			RelDataType columnType = ((SqlTableColumn) columns.get(0)).getType()
+				.deriveType(TYPE_FACTORY);
+			assertEquals(type, columnType);
+		}
+
+		private SqlNode parseStmtAndHandleEx(String sql) {
+			final SqlNode sqlNode;
+			try {
+				sqlNode = getSqlParser(sql).parseStmt();
+			} catch (SqlParseException e) {
+				throw new RuntimeException("Error while parsing SQL: " + sql, e);
+			}
+			return sqlNode;
+		}
+
+		public void checkFails(
+			String sql,
+			String expectedMsgPattern) {
+			SqlParserUtil.StringAndPos sap = SqlParserUtil.findPos(sql);
+			Throwable thrown = null;
+			try {
+				final SqlNode sqlNode;
+				sqlNode = getSqlParser(sap.sql).parseStmt();
+				Util.discard(sqlNode);
+			} catch (Throwable ex) {
+				thrown = ex;
+			}
+
+			checkEx(expectedMsgPattern, sap, thrown);
+		}
+
+		public void checkUnparsed(String sql, String expectedUnparsed) {
+			final SqlNode sqlNode = parseStmtAndHandleEx(sql);
+			assert sqlNode instanceof SqlCreateTable;
+			final SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode;
+			SqlNodeList columns = sqlCreateTable.getColumnList();
+			assert columns.size() == 1;
+			SqlDataTypeSpec dataTypeSpec = ((SqlTableColumn) columns.get(0)).getType();
+			SqlWriter sqlWriter = new SqlPrettyWriter(getSqlDialect(), false);
+			dataTypeSpec.unparse(sqlWriter, 0, 0);
+			assertEquals(expectedUnparsed, sqlWriter.toSqlString().getSql());
+		}
+
+		private void checkEx(String expectedMsgPattern,
+				SqlParserUtil.StringAndPos sap,
+				Throwable thrown) {
+			SqlValidatorTestCase.checkEx(thrown, expectedMsgPattern, sap);
+		}
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index f045817..cc892ab 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -366,7 +366,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 		check("CREATE TABLE tbl1 (\n" +
 			"  a ARRAY<bigint>, \n" +
 			"  b MAP<int, varchar>,\n" +
-			"  c ROW<cc0:int, cc1: float, cc2: varchar>,\n" +
+			"  c ROW<cc0 int, cc1 float, cc2 varchar>,\n" +
+			"  d MULTISET<varchar>,\n" +
 			"  PRIMARY KEY (a, b) \n" +
 			") with (\n" +
 			"  x = 'y', \n" +
@@ -374,28 +375,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			")\n", "CREATE TABLE `TBL1` (\n" +
 			"  `A`  ARRAY< BIGINT >,\n" +
 			"  `B`  MAP< INTEGER, VARCHAR >,\n" +
-			"  `C`  ROW< `CC0` : INTEGER, `CC1` : FLOAT, `CC2` : VARCHAR >,\n" +
-			"  PRIMARY KEY (`A`, `B`)\n" +
-			") WITH (\n" +
-			"  `X` = 'y',\n" +
-			"  `ASD` = 'data'\n" +
-			")");
-	}
-
-	@Test
-	public void testCreateTableWithDecimalType() {
-		check("CREATE TABLE tbl1 (\n" +
-			"  a decimal, \n" +
-			"  b decimal(10, 0),\n" +
-			"  c decimal(38, 38),\n" +
-			"  PRIMARY KEY (a, b) \n" +
-			") with (\n" +
-			"  x = 'y', \n" +
-			"  asd = 'data'\n" +
-			")\n", "CREATE TABLE `TBL1` (\n" +
-			"  `A`  DECIMAL,\n" +
-			"  `B`  DECIMAL(10, 0),\n" +
-			"  `C`  DECIMAL(38, 38),\n" +
+			"  `C`  ROW< `CC0` INTEGER, `CC1` FLOAT, `CC2` VARCHAR >,\n" +
+			"  `D`  MULTISET< VARCHAR >,\n" +
 			"  PRIMARY KEY (`A`, `B`)\n" +
 			") WITH (\n" +
 			"  `X` = 'y',\n" +
@@ -408,7 +389,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 		check("CREATE TABLE tbl1 (\n" +
 			"  a ARRAY<ARRAY<bigint>>, \n" +
 			"  b MAP<MAP<int, varchar>, ARRAY<varchar>>,\n" +
-			"  c ROW<cc0:ARRAY<int>, cc1: float, cc2: varchar>,\n" +
+			"  c ROW<cc0 ARRAY<int>, cc1 float, cc2 varchar>,\n" +
+			"  d MULTISET<ARRAY<int>>,\n" +
 			"  PRIMARY KEY (a, b) \n" +
 			") with (\n" +
 			"  x = 'y', \n" +
@@ -416,7 +398,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			")\n", "CREATE TABLE `TBL1` (\n" +
 			"  `A`  ARRAY< ARRAY< BIGINT > >,\n" +
 			"  `B`  MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > >,\n" +
-			"  `C`  ROW< `CC0` : ARRAY< INTEGER >, `CC1` : FLOAT, `CC2` : VARCHAR >,\n" +
+			"  `C`  ROW< `CC0` ARRAY< INTEGER >, `CC1` FLOAT, `CC2` VARCHAR >,\n" +
+			"  `D`  MULTISET< ARRAY< INTEGER > >,\n" +
 			"  PRIMARY KEY (`A`, `B`)\n" +
 			") WITH (\n" +
 			"  `X` = 'y',\n" +
@@ -437,7 +420,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 			")\n", "(?s).*Encountered \"\\(\" at line 4, column 14.\n" +
 			"Was expecting one of:\n" +
 			"    \"AS\" ...\n" +
-			"    \"CHARACTER\" ...\n" +
+			"    \"ARRAY\" ...\n" +
 			".*");
 	}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index 132755f..077fdfd 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -70,7 +70,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
     val sql =
       """
         |SELECT
-        |  string,
+        |  `string`,
         |  HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),
         |  HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),
         |  COUNT(1),
@@ -79,7 +79,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
         |  COUNT(DISTINCT `float`),
         |  concat_distinct_agg(name)
         |FROM T1
-        |GROUP BY string, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND)
+        |GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND)
       """.stripMargin
 
     val sink = new TestingAppendSink
@@ -122,7 +122,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
     val sql =
       """
         |SELECT
-        |  string,
+        |  `string`,
         |  SESSION_START(rowtime, INTERVAL '0.005' SECOND),
         |  SESSION_ROWTIME(rowtime, INTERVAL '0.005' SECOND),
         |  COUNT(1),
@@ -131,7 +131,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
         |  SUM(`int`),
         |  COUNT(DISTINCT name)
         |FROM T1
-        |GROUP BY string, SESSION(rowtime, INTERVAL '0.005' SECOND)
+        |GROUP BY `string`, SESSION(rowtime, INTERVAL '0.005' SECOND)
       """.stripMargin
 
     val sink = new TestingAppendSink
@@ -171,7 +171,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
     val sql =
       """
         |SELECT
-        |  string,
+        |  `string`,
         |  TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,
         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND),
         |  COUNT(DISTINCT `long`),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 01a55f1..b332818 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -512,7 +512,7 @@ class TimeAttributesITCase extends AbstractTestBase {
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
     val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
     tEnv.registerTable("T1", table)
-    val querySql = "select rowtime as ts, string as msg from T1"
+    val querySql = "select rowtime as ts, `string` as msg from T1"
 
     val results = tEnv.sqlQuery(querySql).toAppendStream[Pojo1]
     results.addSink(new StreamITCase.StringSink[Pojo1])