You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/31 09:28:51 UTC

[flink] branch master updated: [FLINK-6962][table] Add sql parser module and support CREATE / DROP table

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3710bcb  [FLINK-6962][table] Add sql parser module and support CREATE / DROP table
3710bcb is described below

commit 3710bcbc195f9a6304bfb452a58bb4a96804cc24
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri May 31 17:28:39 2019 +0800

    [FLINK-6962][table] Add sql parser module and support CREATE / DROP table
    
    This closes #8548
---
 flink-table/flink-sql-parser/pom.xml               | 297 ++++++++++++
 .../flink-sql-parser/src/main/codegen/config.fmpp  |  41 ++
 .../src/main/codegen/data/Parser.tdd               | 430 ++++++++++++++++++
 .../main/codegen/includes/compoundIdentifier.ftl   |  34 ++
 .../src/main/codegen/includes/parserImpls.ftl      | 289 ++++++++++++
 .../org/apache/flink/sql/parser/SqlProperty.java   |  91 ++++
 .../flink/sql/parser/ddl/ExtendedSqlType.java      |  42 ++
 .../apache/flink/sql/parser/ddl/SqlArrayType.java  |  49 ++
 .../apache/flink/sql/parser/ddl/SqlColumnType.java |  62 +++
 .../flink/sql/parser/ddl/SqlCreateTable.java       | 320 +++++++++++++
 .../apache/flink/sql/parser/ddl/SqlDropTable.java  |  87 ++++
 .../apache/flink/sql/parser/ddl/SqlMapType.java    |  57 +++
 .../apache/flink/sql/parser/ddl/SqlRowType.java    |  78 ++++
 .../flink/sql/parser/ddl/SqlTableColumn.java       |  97 ++++
 .../flink/sql/parser/error/SqlParseException.java  |  60 +++
 .../apache/flink/sql/parser/utils/SqlTimeUnit.java |  49 ++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 498 +++++++++++++++++++++
 .../flink/sql/parser/FlinkSqlUnParserTest.java     |  42 ++
 flink-table/flink-table-planner-blink/pom.xml      |   6 +
 .../table/sqlexec/SqlExecutableStatement.java      |  79 ++++
 flink-table/pom.xml                                |   1 +
 21 files changed, 2709 insertions(+)

diff --git a/flink-table/flink-sql-parser/pom.xml b/flink-table/flink-sql-parser/pom.xml
new file mode 100644
index 0000000..a4003cc
--- /dev/null
+++ b/flink-table/flink-sql-parser/pom.xml
@@ -0,0 +1,297 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<artifactId>flink-table</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.9-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>flink-sql-parser</artifactId>
+	<name>flink-sql-parser</name>
+
+	<packaging>jar</packaging>
+
+	<properties>
+		<!-- override parent pom -->
+		<test.excludedGroups/>
+		<calcite.version>1.19.0</calcite.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-guava</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.calcite</groupId>
+			<artifactId>calcite-core</artifactId>
+			<!-- When updating the Calcite version, make sure to update the dependency exclusions -->
+			<version>${calcite.version}</version>
+			<exclusions>
+				<!--
+
+				Dependencies that are not needed for how we use Calcite right now.
+
+				"mvn dependency:tree" as of Calcite 1.19:
+
+				[INFO] +- org.apache.calcite:calcite-core:jar:1.19.0:compile
+				[INFO] |  +- org.apache.calcite.avatica:avatica-core:jar:1.13.0:compile
+				[INFO] |  \- com.google.guava:guava:jar:19.0:compile
+
+				-->
+				<exclusion>
+					<groupId>org.apache.calcite.avatica</groupId>
+					<artifactId>avatica-metrics</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.google.protobuf</groupId>
+					<artifactId>protobuf-java</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.httpcomponents</groupId>
+					<artifactId>httpclient</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.httpcomponents</groupId>
+					<artifactId>httpcore</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.commons</groupId>
+					<artifactId>commons-dbcp2</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.esri.geometry</groupId>
+					<artifactId>esri-geometry-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.fasterxml.jackson.dataformat</groupId>
+					<artifactId>jackson-dataformat-yaml</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.yahoo.datasketches</groupId>
+					<artifactId>sketches-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>net.hydromatic</groupId>
+					<artifactId>aggdesigner-algorithm</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.fasterxml.jackson.core</groupId>
+					<artifactId>jackson-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.fasterxml.jackson.core</groupId>
+					<artifactId>jackson-databind</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.fasterxml.jackson.core</groupId>
+					<artifactId>jackson-annotations</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.jayway.jsonpath</groupId>
+					<artifactId>json-path</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>joda-time</groupId>
+					<artifactId>joda-time</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.calcite</groupId>
+					<artifactId>calcite-linq4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.janino</groupId>
+					<artifactId>janino</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.janino</groupId>
+					<artifactId>commons-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.google.code.findbugs</groupId>
+					<artifactId>jsr305</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.commons</groupId>
+					<artifactId>commons-lang3</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.calcite</groupId>
+			<artifactId>calcite-core</artifactId>
+			<version>${calcite.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.fasterxml.jackson.core</groupId>
+					<artifactId>jackson-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.fasterxml.jackson.core</groupId>
+					<artifactId>jackson-databind</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.fasterxml.jackson.core</groupId>
+					<artifactId>jackson-annotations</artifactId>
+				</exclusion>
+			</exclusions>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<!-- Extract parser grammar template from calcite-core.jar and put
+                     it under ${project.build.directory} where all freemarker templates are. -->
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<version>2.8</version>
+				<executions>
+					<execution>
+						<id>unpack-parser-template</id>
+						<phase>initialize</phase>
+						<goals>
+							<goal>unpack</goal>
+						</goals>
+						<configuration>
+							<artifactItems>
+								<artifactItem>
+									<groupId>org.apache.calcite</groupId>
+									<artifactId>calcite-core</artifactId>
+									<type>jar</type>
+									<overWrite>true</overWrite>
+									<outputDirectory>${project.build.directory}/</outputDirectory>
+									<includes>**/Parser.jj</includes>
+								</artifactItem>
+							</artifactItems>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<!-- adding fmpp code gen -->
+			<plugin>
+				<artifactId>maven-resources-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>copy-fmpp-resources</id>
+						<phase>initialize</phase>
+						<goals>
+							<goal>copy-resources</goal>
+						</goals>
+						<configuration>
+							<outputDirectory>${project.build.directory}/codegen</outputDirectory>
+							<resources>
+								<resource>
+									<directory>src/main/codegen</directory>
+									<filtering>false</filtering>
+								</resource>
+							</resources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>com.googlecode.fmpp-maven-plugin</groupId>
+				<artifactId>fmpp-maven-plugin</artifactId>
+				<version>1.0</version>
+				<dependencies>
+					<dependency>
+						<groupId>org.freemarker</groupId>
+						<artifactId>freemarker</artifactId>
+						<version>2.3.25-incubating</version>
+					</dependency>
+				</dependencies>
+				<executions>
+					<execution>
+						<id>generate-fmpp-sources</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>generate</goal>
+						</goals>
+						<configuration>
+							<cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
+							<outputDirectory>target/generated-sources</outputDirectory>
+							<templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.5</version>
+				<executions>
+					<execution>
+						<id>add-generated-sources</id>
+						<phase>process-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>${project.build.directory}/generated-sources</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>javacc-maven-plugin</artifactId>
+				<version>2.4</version>
+				<executions>
+					<execution>
+						<phase>generate-sources</phase>
+						<id>javacc</id>
+						<goals>
+							<goal>javacc</goal>
+						</goals>
+						<configuration>
+							<sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
+							<includes>
+								<include>**/Parser.jj</include>
+							</includes>
+							<!-- This must be kept synced with Apache Calcite. -->
+							<lookAhead>2</lookAhead>
+							<isStatic>false</isStatic>
+							<outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<forkCount>1</forkCount>
+					<reuseForks>false</reuseForks>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-table/flink-sql-parser/src/main/codegen/config.fmpp b/flink-table/flink-sql-parser/src/main/codegen/config.fmpp
new file mode 100644
index 0000000..1d5c8e7
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/codegen/config.fmpp
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is an FMPP (http://fmpp.sourceforge.net/) configuration file to
+# allow clients to extend Calcite's SQL parser to support application specific
+# SQL statements, literals or data types.
+#
+# Calcite's parser grammar file (Parser.jj) is written in javacc
+# (https://javacc.org/) with Freemarker (http://freemarker.org/) variables
+# to allow clients to:
+#   1. have custom parser implementation class and package name.
+#   2. insert new parser method implementations written in javacc to parse
+#      custom:
+#      a) SQL statements.
+#      b) literals.
+#      c) data types.
+#   3. add new keywords to support custom SQL constructs added as part of (2).
+#   4. add import statements needed by inserted custom parser implementations.
+#
+# Parser template file (Parser.jj) along with this file are packaged as
+# part of the calcite-core-<version>.jar under "codegen" directory.
+
+data: {
+  parser: tdd(../data/Parser.tdd)
+}
+
+freemarkerLinks: {
+  includes: includes/
+}
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
new file mode 100644
index 0000000..c026d8a
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -0,0 +1,430 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  # Generated parser implementation package and class name.
+  package: "org.apache.flink.sql.parser.impl",
+  class: "FlinkSqlParserImpl",
+
+  # List of additional classes and packages to import.
+  # Example. "org.apache.calcite.sql.*", "java.util.List".
+  imports: [
+    "org.apache.flink.sql.parser.ddl.SqlCreateTable",
+    "org.apache.flink.sql.parser.ddl.SqlDropTable"
+    "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext",
+    "org.apache.flink.sql.parser.ddl.SqlTableColumn",
+    "org.apache.flink.sql.parser.ddl.SqlArrayType",
+    "org.apache.flink.sql.parser.ddl.SqlMapType",
+    "org.apache.flink.sql.parser.ddl.SqlRowType",
+    "org.apache.flink.sql.parser.utils.SqlTimeUnit",
+    "org.apache.flink.sql.parser.SqlProperty",
+    "org.apache.calcite.sql.SqlDrop",
+    "org.apache.calcite.sql.SqlCreate",
+    "java.util.List",
+    "java.util.ArrayList"
+  ]
+
+  # List of new keywords. Example: "DATABASES", "TABLES". If the keyword is not a reserved
+  # keyword, please also add it to 'nonReservedKeywords' section.
+  keywords: [
+    "COMMENT",
+    "PARTITIONED",
+    "IF",
+    "WATERMARK",
+    "ASCENDING",
+    "FROM_SOURCE",
+    "BOUNDED",
+    "DELAY"
+  ]
+
+  # List of keywords from "keywords" section that are not reserved.
+  nonReservedKeywords: [
+    "A"
+    "ABSENT"
+    "ABSOLUTE"
+    "ACTION"
+    "ADA"
+    "ADD"
+    "ADMIN"
+    "AFTER"
+    "ALWAYS"
+    "APPLY"
+    "ASC"
+    "ASSERTION"
+    "ASSIGNMENT"
+    "ATTRIBUTE"
+    "ATTRIBUTES"
+    "BEFORE"
+    "BERNOULLI"
+    "BREADTH"
+    "C"
+    "CASCADE"
+    "CATALOG"
+    "CATALOG_NAME"
+    "CENTURY"
+    "CHAIN"
+    "CHARACTER_SET_CATALOG"
+    "CHARACTER_SET_NAME"
+    "CHARACTER_SET_SCHEMA"
+    "CHARACTERISTICS"
+    "CHARACTERS"
+    "CLASS_ORIGIN"
+    "COBOL"
+    "COLLATION"
+    "COLLATION_CATALOG"
+    "COLLATION_NAME"
+    "COLLATION_SCHEMA"
+    "COLUMN_NAME"
+    "COMMAND_FUNCTION"
+    "COMMAND_FUNCTION_CODE"
+    "COMMITTED"
+    "CONDITION_NUMBER"
+    "CONDITIONAL"
+    "CONNECTION"
+    "CONNECTION_NAME"
+    "CONSTRAINT_CATALOG"
+    "CONSTRAINT_NAME"
+    "CONSTRAINT_SCHEMA"
+    "CONSTRAINTS"
+    "CONSTRUCTOR"
+    "CONTINUE"
+    "CURSOR_NAME"
+    "DATA"
+    "DATABASE"
+    "DATETIME_INTERVAL_CODE"
+    "DATETIME_INTERVAL_PRECISION"
+    "DECADE"
+    "DEFAULTS"
+    "DEFERRABLE"
+    "DEFERRED"
+    "DEFINED"
+    "DEFINER"
+    "DEGREE"
+    "DEPTH"
+    "DERIVED"
+    "DESC"
+    "DESCRIPTION"
+    "DESCRIPTOR"
+    "DIAGNOSTICS"
+    "DISPATCH"
+    "DOMAIN"
+    "DOW"
+    "DOY"
+    "DYNAMIC_FUNCTION"
+    "DYNAMIC_FUNCTION_CODE"
+    "ENCODING"
+    "EPOCH"
+    "ERROR"
+    "EXCEPTION"
+    "EXCLUDE"
+    "EXCLUDING"
+    "FINAL"
+    "FIRST"
+    "FOLLOWING"
+    "FORMAT"
+    "FORTRAN"
+    "FOUND"
+    "FRAC_SECOND"
+    "G"
+    "GENERAL"
+    "GENERATED"
+    "GEOMETRY"
+    "GO"
+    "GOTO"
+    "GRANTED"
+    "HIERARCHY"
+    "IMMEDIATE"
+    "IMMEDIATELY"
+    "IMPLEMENTATION"
+    "INCLUDING"
+    "INCREMENT"
+    "INITIALLY"
+    "INPUT"
+    "INSTANCE"
+    "INSTANTIABLE"
+    "INVOKER"
+    "ISODOW"
+    "ISOYEAR"
+    "ISOLATION"
+    "JAVA"
+    "JSON"
+    "JSON_TYPE"
+    "JSON_DEPTH"
+    "JSON_PRETTY"
+    "K"
+    "KEY"
+    "KEY_MEMBER"
+    "KEY_TYPE"
+    "LABEL"
+    "LAST"
+    "LENGTH"
+    "LEVEL"
+    "LIBRARY"
+    "LOCATOR"
+    "M"
+    "MAP"
+    "MATCHED"
+    "MAXVALUE"
+    "MICROSECOND"
+    "MESSAGE_LENGTH"
+    "MESSAGE_OCTET_LENGTH"
+    "MESSAGE_TEXT"
+    "MILLISECOND"
+    "MILLENNIUM"
+    "MINVALUE"
+    "MORE_"
+    "MUMPS"
+    "NAME"
+    "NAMES"
+    "NANOSECOND"
+    "NESTING"
+    "NORMALIZED"
+    "NULLABLE"
+    "NULLS"
+    "NUMBER"
+    "OBJECT"
+    "OCTETS"
+    "OPTION"
+    "OPTIONS"
+    "ORDERING"
+    "ORDINALITY"
+    "OTHERS"
+    "OUTPUT"
+    "OVERRIDING"
+    "PAD"
+    "PARAMETER_MODE"
+    "PARAMETER_NAME"
+    "PARAMETER_ORDINAL_POSITION"
+    "PARAMETER_SPECIFIC_CATALOG"
+    "PARAMETER_SPECIFIC_NAME"
+    "PARAMETER_SPECIFIC_SCHEMA"
+    "PARTIAL"
+    "PASCAL"
+    "PASSING"
+    "PASSTHROUGH"
+    "PAST"
+    "PATH"
+    "PLACING"
+    "PLAN"
+    "PLI"
+    "PRECEDING"
+    "PRESERVE"
+    "PRIOR"
+    "PRIVILEGES"
+    "PUBLIC"
+    "QUARTER"
+    "READ"
+    "RELATIVE"
+    "REPEATABLE"
+    "REPLACE"
+    "RESTART"
+    "RESTRICT"
+    "RETURNED_CARDINALITY"
+    "RETURNED_LENGTH"
+    "RETURNED_OCTET_LENGTH"
+    "RETURNED_SQLSTATE"
+    "RETURNING"
+    "ROLE"
+    "ROUTINE"
+    "ROUTINE_CATALOG"
+    "ROUTINE_NAME"
+    "ROUTINE_SCHEMA"
+    "ROW_COUNT"
+    "SCALAR"
+    "SCALE"
+    "SCHEMA"
+    "SCHEMA_NAME"
+    "SCOPE_CATALOGS"
+    "SCOPE_NAME"
+    "SCOPE_SCHEMA"
+    "SECTION"
+    "SECURITY"
+    "SELF"
+    "SEQUENCE"
+    "SERIALIZABLE"
+    "SERVER"
+    "SERVER_NAME"
+    "SESSION"
+    "SETS"
+    "SIMPLE"
+    "SIZE"
+    "SOURCE"
+    "SPACE"
+    "SPECIFIC_NAME"
+    "SQL_BIGINT"
+    "SQL_BINARY"
+    "SQL_BIT"
+    "SQL_BLOB"
+    "SQL_BOOLEAN"
+    "SQL_CHAR"
+    "SQL_CLOB"
+    "SQL_DATE"
+    "SQL_DECIMAL"
+    "SQL_DOUBLE"
+    "SQL_FLOAT"
+    "SQL_INTEGER"
+    "SQL_INTERVAL_DAY"
+    "SQL_INTERVAL_DAY_TO_HOUR"
+    "SQL_INTERVAL_DAY_TO_MINUTE"
+    "SQL_INTERVAL_DAY_TO_SECOND"
+    "SQL_INTERVAL_HOUR"
+    "SQL_INTERVAL_HOUR_TO_MINUTE"
+    "SQL_INTERVAL_HOUR_TO_SECOND"
+    "SQL_INTERVAL_MINUTE"
+    "SQL_INTERVAL_MINUTE_TO_SECOND"
+    "SQL_INTERVAL_MONTH"
+    "SQL_INTERVAL_SECOND"
+    "SQL_INTERVAL_YEAR"
+    "SQL_INTERVAL_YEAR_TO_MONTH"
+    "SQL_LONGVARBINARY"
+    "SQL_LONGVARNCHAR"
+    "SQL_LONGVARCHAR"
+    "SQL_NCHAR"
+    "SQL_NCLOB"
+    "SQL_NUMERIC"
+    "SQL_NVARCHAR"
+    "SQL_REAL"
+    "SQL_SMALLINT"
+    "SQL_TIME"
+    "SQL_TIMESTAMP"
+    "SQL_TINYINT"
+    "SQL_TSI_DAY"
+    "SQL_TSI_FRAC_SECOND"
+    "SQL_TSI_HOUR"
+    "SQL_TSI_MICROSECOND"
+    "SQL_TSI_MINUTE"
+    "SQL_TSI_MONTH"
+    "SQL_TSI_QUARTER"
+    "SQL_TSI_SECOND"
+    "SQL_TSI_WEEK"
+    "SQL_TSI_YEAR"
+    "SQL_VARBINARY"
+    "SQL_VARCHAR"
+    "STATE"
+    "STATEMENT"
+    "STRUCTURE"
+    "STYLE"
+    "SUBCLASS_ORIGIN"
+    "SUBSTITUTE"
+    "TABLE_NAME"
+    "TEMPORARY"
+    "TIES"
+    "TIMESTAMPADD"
+    "TIMESTAMPDIFF"
+    "TOP_LEVEL_COUNT"
+    "TRANSACTION"
+    "TRANSACTIONS_ACTIVE"
+    "TRANSACTIONS_COMMITTED"
+    "TRANSACTIONS_ROLLED_BACK"
+    "TRANSFORM"
+    "TRANSFORMS"
+    "TRIGGER_CATALOG"
+    "TRIGGER_NAME"
+    "TRIGGER_SCHEMA"
+    "TYPE"
+    "UNBOUNDED"
+    "UNCOMMITTED"
+    "UNCONDITIONAL"
+    "UNDER"
+    "UNNAMED"
+    "USAGE"
+    "USER_DEFINED_TYPE_CATALOG"
+    "USER_DEFINED_TYPE_CODE"
+    "USER_DEFINED_TYPE_NAME"
+    "USER_DEFINED_TYPE_SCHEMA"
+    "UTF8"
+    "UTF16"
+    "UTF32"
+    "VERSION"
+    "VIEW"
+    "WEEK"
+    "WRAPPER"
+    "WORK"
+    "WRITE"
+    "XML"
+    "ZONE"
+
+    # not in core, added in Flink
+    "PARTITIONED",
+    "IF",
+    "ASCENDING",
+    "FROM_SOURCE",
+    "BOUNDED",
+    "DELAY"
+  ]
+
+  # List of methods for parsing custom SQL statements.
+  # Return type of method implementation should be 'SqlNode'.
+  # Example: SqlShowDatabases(), SqlShowTables().
+  statementParserMethods: [
+  ]
+
+  # List of methods for parsing custom literals.
+  # Return type of method implementation should be "SqlNode".
+  # Example: ParseJsonLiteral().
+  literalParserMethods: [
+  ]
+
+  # List of methods for parsing custom data types.
+  # Return type of method implementation should be "SqlIdentifier".
+  # Example: SqlParseTimeStampZ().
+  dataTypeParserMethods: [
+    "SqlArrayType()",
+    "SqlMapType()",
+    "SqlRowType()"
+  ]
+
+  # List of methods for parsing builtin function calls.
+  # Return type of method implementation should be "SqlNode".
+  # Example: DateFunctionCall().
+  builtinFunctionCallMethods: [
+  ]
+
+  # List of methods for parsing extensions to "ALTER <scope>" calls.
+  # Each must accept arguments "(SqlParserPos pos, String scope)".
+  # Example: "SqlUploadJarNode"
+  alterStatementParserMethods: [
+  ]
+
+  # List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
+  # Each must accept arguments "(SqlParserPos pos, boolean replace)".
+  createStatementParserMethods: [
+    "SqlCreateTable"
+  ]
+
+  # List of methods for parsing extensions to "DROP" calls.
+  # Each must accept arguments "(Span s)".
+  dropStatementParserMethods: [
+    "SqlDropTable"
+  ]
+
+  # List of files in @includes directory that have parser method
+  # implementations for parsing custom SQL statements, literals or types
+  # given as part of "statementParserMethods", "literalParserMethods" or
+  # "dataTypeParserMethods".
+  implementationFiles: [
+    "parserImpls.ftl"
+  ]
+
+  # List of additional join types. Each is a method with no arguments.
+  # Example: LeftSemiJoin()
+  joinTypes: [
+  ]
+
+  includeCompoundIdentifier: true
+  includeBraces: true
+  includeAdditionalDeclarations: false
+}
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/compoundIdentifier.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/compoundIdentifier.ftl
new file mode 100644
index 0000000..70db3c2
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/compoundIdentifier.ftl
@@ -0,0 +1,34 @@
+<#--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to you under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+-->
+
+<#--
+  Add implementations of additional parser statements, literals or
+  data types.
+
+  Example of SqlShowTables() implementation:
+  SqlNode SqlShowTables()
+  {
+    ...local variables...
+  }
+  {
+    <SHOW> <TABLES>
+    ...
+    {
+      return SqlShowTables(...)
+    }
+  }
+-->
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
new file mode 100644
index 0000000..92607d5
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -0,0 +1,289 @@
+<#--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to you under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+-->
+
+void TableColumn(TableCreationContext context) :
+{
+}
+{
+    (
+        TableColumn2(context.columnList)
+    |
+        context.primaryKeyList = PrimaryKey()
+    |
+        UniqueKey(context.uniqueKeysList)
+    |
+        ComputedColumn(context)
+    )
+}
+
+void ComputedColumn(TableCreationContext context) :
+{
+    SqlNode identifier;
+    SqlNode expr;
+    boolean hidden = false;
+    SqlParserPos pos;
+}
+{
+    identifier = SimpleIdentifier() {pos = getPos();}
+    <AS>
+    expr = Expression(ExprContext.ACCEPT_SUB_QUERY) {
+        expr = SqlStdOperatorTable.AS.createCall(Span.of(identifier, expr).pos(), expr, identifier);
+        context.columnList.add(expr);
+    }
+}
+
+void TableColumn2(List<SqlNode> list) :
+{
+    SqlParserPos pos;
+    SqlIdentifier name;
+    SqlDataTypeSpec type;
+    SqlCharStringLiteral comment = null;
+}
+{
+    name = SimpleIdentifier()
+    type = DataType()
+    (
+        <NULL> { type = type.withNullable(true); }
+    |
+        <NOT> <NULL> { type = type.withNullable(false); }
+    |
+        { type = type.withNullable(true); }
+    )
+    [ <COMMENT> <QUOTED_STRING> {
+        String p = SqlParserUtil.parseString(token.image);
+        comment = SqlLiteral.createCharString(p, getPos());
+    }]
+    {
+        SqlTableColumn tableColumn = new SqlTableColumn(name, type, comment, getPos());
+        list.add(tableColumn);
+    }
+}
+
+SqlNodeList PrimaryKey() :
+{
+    List<SqlNode> pkList = new ArrayList<SqlNode>();
+
+    SqlParserPos pos;
+    SqlIdentifier columnName;
+}
+{
+    <PRIMARY> { pos = getPos(); } <KEY> <LPAREN>
+        columnName = SimpleIdentifier() { pkList.add(columnName); }
+        (<COMMA> columnName = SimpleIdentifier() { pkList.add(columnName); })*
+    <RPAREN>
+    {
+        return new SqlNodeList(pkList, pos.plus(getPos()));
+    }
+}
+
+void UniqueKey(List<SqlNodeList> list) :
+{
+    List<SqlNode> ukList = new ArrayList<SqlNode>();
+    SqlParserPos pos;
+    SqlIdentifier columnName;
+}
+{
+    <UNIQUE> { pos = getPos(); } <LPAREN>
+        columnName = SimpleIdentifier() { ukList.add(columnName); }
+        (<COMMA> columnName = SimpleIdentifier() { ukList.add(columnName); })*
+    <RPAREN>
+    {
+        SqlNodeList uk = new SqlNodeList(ukList, pos.plus(getPos()));
+        list.add(uk);
+    }
+}
+
+SqlNode PropertyValue() :
+{
+    SqlIdentifier key;
+    SqlNode value;
+    SqlParserPos pos;
+}
+{
+    key = CompoundIdentifier()
+    { pos = getPos(); }
+    <EQ> value = StringLiteral()
+    {
+        return new SqlProperty(key, value, getPos());
+    }
+}
+
+SqlCreate SqlCreateTable(Span s, boolean replace) :
+{
+    final SqlParserPos startPos = s.pos();
+    SqlIdentifier tableName;
+    SqlNodeList primaryKeyList = null;
+    List<SqlNodeList> uniqueKeysList = null;
+    SqlNodeList columnList = SqlNodeList.EMPTY;
+	SqlCharStringLiteral comment = null;
+
+    SqlNodeList propertyList = null;
+    SqlNodeList partitionColumns = null;
+    SqlParserPos pos = startPos;
+}
+{
+    <TABLE>
+
+    tableName = CompoundIdentifier()
+    [
+        <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
+        TableColumn(ctx)
+        (
+            <COMMA> TableColumn(ctx)
+        )*
+        {
+            pos = pos.plus(getPos());
+            columnList = new SqlNodeList(ctx.columnList, pos);
+            primaryKeyList = ctx.primaryKeyList;
+            uniqueKeysList = ctx.uniqueKeysList;
+        }
+        <RPAREN>
+    ]
+    [ <COMMENT> <QUOTED_STRING> {
+        String p = SqlParserUtil.parseString(token.image);
+        comment = SqlLiteral.createCharString(p, getPos());
+    }]
+    [
+        <PARTITIONED> <BY>
+            {
+                SqlNode column;
+                List<SqlNode> partitionKey = new ArrayList<SqlNode>();
+                pos = getPos();
+
+            }
+            <LPAREN>
+            [
+                column = SimpleIdentifier()
+                {
+                    partitionKey.add(column);
+                }
+                (
+                    <COMMA> column = SimpleIdentifier()
+                        {
+                            partitionKey.add(column);
+                        }
+                )*
+            ]
+            <RPAREN>
+            {
+                partitionColumns = new SqlNodeList(partitionKey, pos.plus(getPos()));
+            }
+    ]
+    [
+        <WITH>
+            {
+                SqlNode property;
+                List<SqlNode> proList = new ArrayList<SqlNode>();
+                pos = getPos();
+            }
+            <LPAREN>
+            [
+                property = PropertyValue()
+                {
+                proList.add(property);
+                }
+                (
+                <COMMA> property = PropertyValue()
+                    {
+                    proList.add(property);
+                    }
+                )*
+            ]
+            <RPAREN>
+        {  propertyList = new SqlNodeList(proList, pos.plus(getPos())); }
+    ]
+
+    {
+        return new SqlCreateTable(startPos.plus(getPos()),
+                tableName,
+                columnList,
+                primaryKeyList,
+                uniqueKeysList,
+                propertyList,
+                partitionColumns,
+                comment);
+    }
+}
+
+SqlDrop SqlDropTable(Span s, boolean replace) :
+{
+    SqlIdentifier tableName = null;
+    boolean ifExists = false;
+}
+{
+    <TABLE>
+
+    [<IF> <EXISTS> { ifExists = true; } ]
+
+    tableName = CompoundIdentifier()
+
+    {
+         return new SqlDropTable(s.pos(), tableName, ifExists);
+    }
+}
+
+SqlIdentifier SqlArrayType() :
+{
+    SqlParserPos pos;
+    SqlDataTypeSpec elementType;
+}
+{
+    <ARRAY> { pos = getPos(); }
+    <LT> elementType = DataType()
+    <GT>
+    {
+        return new SqlArrayType(pos, elementType);
+    }
+}
+
+SqlIdentifier SqlMapType() :
+{
+    SqlParserPos pos;
+    SqlDataTypeSpec keyType;
+    SqlDataTypeSpec valType;
+}
+{
+    <MAP> { pos = getPos(); }
+    <LT> keyType = DataType()
+    <COMMA> valType = DataType()
+    <GT>
+    {
+        return new SqlMapType(pos, keyType, valType);
+    }
+}
+
+SqlIdentifier SqlRowType() :
+{
+    SqlParserPos pos;
+    List<SqlIdentifier> fieldNames = new ArrayList<SqlIdentifier>();
+    List<SqlDataTypeSpec> fieldTypes = new ArrayList<SqlDataTypeSpec>();
+}
+{
+    <ROW> { pos = getPos(); SqlIdentifier fName; SqlDataTypeSpec fType;}
+    <LT>
+    fName = SimpleIdentifier() <COLON> fType = DataType()
+    { fieldNames.add(fName); fieldTypes.add(fType); }
+    (
+        <COMMA>
+        fName = SimpleIdentifier() <COLON> fType = DataType()
+        { fieldNames.add(fName); fieldTypes.add(fType); }
+    )*
+    <GT>
+    {
+        return new SqlRowType(pos, fieldNames, fieldTypes);
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlProperty.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlProperty.java
new file mode 100644
index 0000000..dbb58e6
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlProperty.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Properties of a DDL, consist of key value pairs.
+ */
+public class SqlProperty extends SqlCall {
+
+	/** Use this operator only if you don't have a better one. */
+	protected static final SqlOperator OPERATOR =
+		new SqlSpecialOperator("Property", SqlKind.OTHER);
+
+	private final SqlIdentifier key;
+	private final SqlNode value;
+
+	public SqlProperty(SqlIdentifier key, SqlNode value, SqlParserPos pos) {
+		super(pos);
+		this.key = requireNonNull(key, "Property key is missing");
+		this.value = requireNonNull(value, "Property value is missing");
+	}
+
+	public SqlIdentifier getKey() {
+		return key;
+	}
+
+	public SqlNode getValue() {
+		return value;
+	}
+
+	public String getKeyString() {
+		return key.toString();
+	}
+
+	public String getValueString() {
+		return ((NlsString) SqlLiteral.value(value)).getValue();
+	}
+
+	@Override
+	public SqlOperator getOperator() {
+		return OPERATOR;
+	}
+
+	@Override
+	public List<SqlNode> getOperandList() {
+		return ImmutableNullableList.of(key, value);
+	}
+
+	@Override
+	public void unparse(
+		SqlWriter writer,
+		int leftPrec,
+		int rightPrec) {
+		key.unparse(writer, leftPrec, rightPrec);
+		writer.keyword("=");
+		value.unparse(writer, leftPrec, rightPrec);
+	}
+}
+
+// End SqlProperty.java
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/ExtendedSqlType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/ExtendedSqlType.java
new file mode 100644
index 0000000..135dc49
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/ExtendedSqlType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlWriter;
+
+/** An remark interface which should be inherited by supported sql types which are not supported
+ * by Calcite default parser.
+ *
+ * <p>Caution that the subclass must override the method
+ * {@link org.apache.calcite.sql.SqlNode#unparse(SqlWriter, int, int)}.
+ */
+public interface ExtendedSqlType {
+
+	static void unparseType(SqlDataTypeSpec type,
+			SqlWriter writer,
+			int leftPrec,
+			int rightPrec) {
+		if (type.getTypeName() instanceof ExtendedSqlType) {
+			type.getTypeName().unparse(writer, leftPrec, rightPrec);
+		} else {
+			type.unparse(writer, leftPrec, rightPrec);
+		}
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlArrayType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlArrayType.java
new file mode 100644
index 0000000..7d43d4f
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlArrayType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Parse column of ArrayType.
+ */
+public class SqlArrayType extends SqlIdentifier implements ExtendedSqlType {
+
+	private final SqlDataTypeSpec elementType;
+
+	public SqlArrayType(SqlParserPos pos, SqlDataTypeSpec elementType) {
+		super(SqlTypeName.ARRAY.getName(), pos);
+		this.elementType = elementType;
+	}
+
+	public SqlDataTypeSpec getElementType() {
+		return elementType;
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		writer.keyword("ARRAY<");
+		ExtendedSqlType.unparseType(this.elementType, writer, leftPrec, rightPrec);
+		writer.keyword(">");
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java
new file mode 100644
index 0000000..3e494a72
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+/**
+ * All supported data types in DDL. Used for Create Table DDL validation.
+ */
+public enum SqlColumnType {
+	BOOLEAN,
+	TINYINT,
+	SMALLINT,
+	INT,
+	INTEGER,
+	BIGINT,
+	REAL,
+	FLOAT,
+	DOUBLE,
+	DECIMAL,
+	DATE,
+	TIME,
+	TIMESTAMP,
+	VARCHAR,
+	VARBINARY,
+	ANY,
+	ARRAY,
+	MAP,
+	ROW,
+	UNSUPPORTED;
+
+	/** Returns the column type with the string representation. **/
+	public static SqlColumnType getType(String type) {
+		if (type == null) {
+			return UNSUPPORTED;
+		}
+		try {
+			return SqlColumnType.valueOf(type.toUpperCase());
+		} catch (IllegalArgumentException var1) {
+			return UNSUPPORTED;
+		}
+	}
+
+	/** Returns true if this type is unsupported. **/
+	public boolean isUnsupported() {
+		return this.equals(UNSUPPORTED);
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
new file mode 100644
index 0000000..47773cb
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.error.SqlParseException;
+
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.dialect.AnsiSqlDialect;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.pretty.SqlPrettyWriter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * CREATE TABLE DDL sql call.
+ */
+public class SqlCreateTable extends SqlCreate {
+
+	public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE);
+
+	private final SqlIdentifier tableName;
+
+	private final SqlNodeList columnList;
+
+	private final SqlNodeList propertyList;
+
+	private final SqlNodeList primaryKeyList;
+
+	private final List<SqlNodeList> uniqueKeysList;
+
+	private final SqlNodeList partitionKeyList;
+
+	private final SqlCharStringLiteral comment;
+
+	public SqlCreateTable(
+			SqlParserPos pos,
+			SqlIdentifier tableName,
+			SqlNodeList columnList,
+			SqlNodeList primaryKeyList,
+			List<SqlNodeList> uniqueKeysList,
+			SqlNodeList propertyList,
+			SqlNodeList partitionKeyList,
+			SqlCharStringLiteral comment) {
+		super(OPERATOR, pos, false, false);
+		this.tableName = requireNonNull(tableName, "Table name is missing");
+		this.columnList = requireNonNull(columnList, "Column list should not be null");
+		this.primaryKeyList = primaryKeyList;
+		this.uniqueKeysList = uniqueKeysList;
+		this.propertyList = propertyList;
+		this.partitionKeyList = partitionKeyList;
+		this.comment = comment;
+	}
+
+	@Override
+	public SqlOperator getOperator() {
+		return OPERATOR;
+	}
+
+	@Override
+	public List<SqlNode> getOperandList() {
+		return null;
+	}
+
+	public SqlIdentifier getTableName() {
+		return tableName;
+	}
+
+	public SqlNodeList getColumnList() {
+		return columnList;
+	}
+
+	public SqlNodeList getPropertyList() {
+		return propertyList;
+	}
+
+	public SqlNodeList getPartitionKeyList() {
+		return partitionKeyList;
+	}
+
+	public SqlNodeList getPrimaryKeyList() {
+		return primaryKeyList;
+	}
+
+	public List<SqlNodeList> getUniqueKeysList() {
+		return uniqueKeysList;
+	}
+
+	public SqlCharStringLiteral getComment() {
+		return comment;
+	}
+
+	public void validate() throws SqlParseException {
+		Set<String> columnNames = new HashSet<>();
+		if (columnList != null) {
+			for (SqlNode column : columnList) {
+				String columnName = null;
+				if (column instanceof SqlTableColumn) {
+					SqlTableColumn tableColumn = (SqlTableColumn) column;
+					columnName = tableColumn.getName().getSimple();
+					String typeName = tableColumn.getType().getTypeName().getSimple();
+					if (SqlColumnType.getType(typeName).isUnsupported()) {
+						throw new SqlParseException(
+							column.getParserPosition(),
+							"Not support type [" + typeName + "], at " + column.getParserPosition());
+					}
+				} else if (column instanceof SqlBasicCall) {
+					SqlBasicCall tableColumn = (SqlBasicCall) column;
+					columnName = tableColumn.getOperands()[1].toString();
+				}
+
+				if (!columnNames.add(columnName)) {
+					throw new SqlParseException(
+						column.getParserPosition(),
+						"Duplicate column name [" + columnName + "], at " +
+							column.getParserPosition());
+				}
+			}
+		}
+
+		if (this.primaryKeyList != null) {
+			for (SqlNode primaryKeyNode : this.primaryKeyList) {
+				String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple();
+				if (!columnNames.contains(primaryKey)) {
+					throw new SqlParseException(
+						primaryKeyNode.getParserPosition(),
+						"Primary key [" + primaryKey + "] not defined in columns, at " +
+							primaryKeyNode.getParserPosition());
+				}
+			}
+		}
+
+		if (this.uniqueKeysList != null) {
+			for (SqlNodeList uniqueKeys: this.uniqueKeysList) {
+				for (SqlNode uniqueKeyNode : uniqueKeys) {
+					String uniqueKey = ((SqlIdentifier) uniqueKeyNode).getSimple();
+					if (!columnNames.contains(uniqueKey)) {
+						throw new SqlParseException(
+								uniqueKeyNode.getParserPosition(),
+								"Unique key [" + uniqueKey + "] not defined in columns, at " + uniqueKeyNode.getParserPosition());
+					}
+				}
+			}
+		}
+
+		if (this.partitionKeyList != null) {
+			for (SqlNode partitionKeyNode : this.partitionKeyList.getList()) {
+				String partitionKey = ((SqlIdentifier) partitionKeyNode).getSimple();
+				if (!columnNames.contains(partitionKey)) {
+					throw new SqlParseException(
+						partitionKeyNode.getParserPosition(),
+						"Partition column [" + partitionKey + "] not defined in columns, at "
+							+ partitionKeyNode.getParserPosition());
+				}
+			}
+		}
+
+	}
+
+	public boolean containsComputedColumn() {
+		for (SqlNode column : columnList) {
+			if (column instanceof SqlBasicCall) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	/**
+	 * Returns the projection format of the DDL columns(including computed columns).
+	 * e.g. If we got a DDL:
+	 * <pre>
+	 *   create table tbl1(
+	 *     col1 int,
+	 *     col2 varchar,
+	 *     col3 as to_timestamp(col2)
+	 *   ) with (
+	 *     connector = 'csv'
+	 *   )
+	 * </pre>
+	 * we would return a query like:
+	 *
+	 * <p>"col1, col2, to_timestamp(col2) as col3", caution that the "computed column" operands
+	 * have been reversed.
+	 */
+	public String getColumnSqlString() {
+		SqlPrettyWriter writer = new SqlPrettyWriter(AnsiSqlDialect.DEFAULT);
+		writer.setAlwaysUseParentheses(true);
+		writer.setSelectListItemsOnSeparateLines(false);
+		writer.setIndentation(0);
+		writer.startList("", "");
+		for (SqlNode column : columnList) {
+			writer.sep(",");
+			if (column instanceof SqlTableColumn) {
+				SqlTableColumn tableColumn = (SqlTableColumn) column;
+				tableColumn.getName().unparse(writer, 0, 0);
+			} else {
+				column.unparse(writer, 0, 0);
+			}
+		}
+
+		return writer.toString();
+	}
+
+	@Override
+	public void unparse(
+		SqlWriter writer,
+		int leftPrec,
+		int rightPrec) {
+		writer.keyword("CREATE TABLE");
+		tableName.unparse(writer, leftPrec, rightPrec);
+		SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
+		for (SqlNode column : columnList) {
+			printIndent(writer);
+			if (column instanceof SqlBasicCall) {
+				SqlCall call = (SqlCall) column;
+				SqlCall newCall = call.getOperator().createCall(
+					SqlParserPos.ZERO,
+					call.operand(1),
+					call.operand(0));
+				newCall.unparse(writer, leftPrec, rightPrec);
+			} else {
+				column.unparse(writer, leftPrec, rightPrec);
+			}
+		}
+		if (primaryKeyList != null && primaryKeyList.size() > 0) {
+			printIndent(writer);
+			writer.keyword("PRIMARY KEY");
+			SqlWriter.Frame keyFrame = writer.startList("(", ")");
+			primaryKeyList.unparse(writer, leftPrec, rightPrec);
+			writer.endList(keyFrame);
+		}
+		if (uniqueKeysList != null && uniqueKeysList.size() > 0) {
+			printIndent(writer);
+			for (SqlNodeList uniqueKeyList : uniqueKeysList) {
+				writer.keyword("UNIQUE");
+				SqlWriter.Frame keyFrame = writer.startList("(", ")");
+				uniqueKeyList.unparse(writer, leftPrec, rightPrec);
+				writer.endList(keyFrame);
+			}
+		}
+		writer.newlineAndIndent();
+		writer.endList(frame);
+
+		if (comment != null) {
+			writer.newlineAndIndent();
+			writer.keyword("COMMENT");
+			comment.unparse(writer, leftPrec, rightPrec);
+		}
+
+		if (this.partitionKeyList != null) {
+			writer.newlineAndIndent();
+			writer.keyword("PARTITIONED BY");
+			SqlWriter.Frame withFrame = writer.startList("(", ")");
+			this.partitionKeyList.unparse(writer, leftPrec, rightPrec);
+			writer.endList(withFrame);
+			writer.newlineAndIndent();
+		}
+
+		if (propertyList != null) {
+			writer.keyword("WITH");
+			SqlWriter.Frame withFrame = writer.startList("(", ")");
+			for (SqlNode property : propertyList) {
+				printIndent(writer);
+				property.unparse(writer, leftPrec, rightPrec);
+			}
+			writer.newlineAndIndent();
+			writer.endList(withFrame);
+		}
+	}
+
+	private void printIndent(SqlWriter writer) {
+		writer.sep(",", false);
+		writer.newlineAndIndent();
+		writer.print("  ");
+	}
+
+	/**
+	 * Table creation context.
+	 */
+	public static class TableCreationContext {
+		public List<SqlNode> columnList = new ArrayList<>();
+		public SqlNodeList primaryKeyList;
+		public List<SqlNodeList> uniqueKeysList = new ArrayList<>();
+	}
+
+	public String[] fullTableName() {
+		return tableName.names.toArray(new String[0]);
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java
new file mode 100644
index 0000000..ee544c4
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDrop;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/**
+ * DROP TABLE DDL sql call.
+ */
+public class SqlDropTable extends SqlDrop {
+	private static final SqlOperator OPERATOR =
+		new SqlSpecialOperator("DROP TABLE", SqlKind.DROP_TABLE);
+
+	private SqlIdentifier tableName;
+	private boolean ifExists;
+
+	public SqlDropTable(SqlParserPos pos, SqlIdentifier tableName, boolean ifExists) {
+		super(OPERATOR, pos, ifExists);
+		this.tableName = tableName;
+		this.ifExists = ifExists;
+	}
+
+	@Override
+	public List<SqlNode> getOperandList() {
+		return ImmutableNullableList.of(tableName);
+	}
+
+	public SqlIdentifier getTableName() {
+		return tableName;
+	}
+
+	public void setTableName(SqlIdentifier viewName) {
+		this.tableName = viewName;
+	}
+
+	public boolean getIfExists() {
+		return this.ifExists;
+	}
+
+	public void setIfExists(boolean ifExists) {
+		this.ifExists = ifExists;
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		writer.keyword("DROP");
+		writer.keyword("TABLE");
+		if (ifExists) {
+			writer.keyword("IF EXISTS");
+		}
+		tableName.unparse(writer, leftPrec, rightPrec);
+	}
+
+	public void validate() {
+		// no-op
+	}
+
+	public String[] fullTableName() {
+		return tableName.names.toArray(new String[0]);
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlMapType.java
new file mode 100644
index 0000000..98f549a
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlMapType.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Extended Flink MapType.
+ */
+public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
+
+	private final SqlDataTypeSpec keyType;
+	private final SqlDataTypeSpec valType;
+
+	public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) {
+		super(SqlTypeName.MAP.getName(), pos);
+		this.keyType = keyType;
+		this.valType = valType;
+	}
+
+	public SqlDataTypeSpec getKeyType() {
+		return keyType;
+	}
+
+	public SqlDataTypeSpec getValType() {
+		return valType;
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		writer.keyword("MAP<");
+		ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec);
+		writer.sep(",");
+		ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec);
+		writer.keyword(">");
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlRowType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlRowType.java
new file mode 100644
index 0000000..3958053
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlRowType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Pair;
+
+import java.util.List;
+
+/**
+ * Parse column of Row type.
+ */
+public class SqlRowType extends SqlIdentifier implements ExtendedSqlType {
+
+	private final List<SqlIdentifier> fieldNames;
+	private final List<SqlDataTypeSpec> fieldTypes;
+
+	public SqlRowType(SqlParserPos pos,
+			List<SqlIdentifier> fieldNames,
+			List<SqlDataTypeSpec> fieldTypes) {
+		super(SqlTypeName.ROW.getName(), pos);
+		this.fieldNames = fieldNames;
+		this.fieldTypes = fieldTypes;
+	}
+
+	public List<SqlIdentifier> getFieldNames() {
+		return fieldNames;
+	}
+
+	public List<SqlDataTypeSpec> getFieldTypes() {
+		return fieldTypes;
+	}
+
+	public int getArity() {
+		return fieldNames.size();
+	}
+
+	public SqlIdentifier getFieldName(int i) {
+		return fieldNames.get(i);
+	}
+
+	public SqlDataTypeSpec getFieldType(int i) {
+		return fieldTypes.get(i);
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		writer.keyword("ROW");
+		SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
+		for (Pair<SqlIdentifier, SqlDataTypeSpec> p : Pair.zip(this.fieldNames, this.fieldTypes)) {
+			writer.sep(",", false);
+			p.left.unparse(writer, 0, 0);
+			writer.sep(":");
+			ExtendedSqlType.unparseType(p.right, writer, leftPrec, rightPrec);
+		}
+		writer.endList(frame);
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java
new file mode 100644
index 0000000..bcd578f
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Table column of a CREATE TABLE DDL.
+ */
+public class SqlTableColumn extends SqlCall {
+
+	private SqlIdentifier name;
+	private SqlDataTypeSpec type;
+	private SqlCharStringLiteral comment;
+
+	public SqlTableColumn(SqlIdentifier name,
+			SqlDataTypeSpec type,
+			SqlCharStringLiteral comment,
+			SqlParserPos pos) {
+		super(pos);
+		this.name = requireNonNull(name, "Column name should not be null");
+		this.type = requireNonNull(type, "Column type should not be null");
+		this.comment = comment;
+	}
+
+	@Override
+	public SqlOperator getOperator() {
+		return null;
+	}
+
+	@Override
+	public List<SqlNode> getOperandList() {
+		return null;
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		this.name.unparse(writer, leftPrec, rightPrec);
+		writer.print(" ");
+		ExtendedSqlType.unparseType(type, writer, leftPrec, rightPrec);
+		if (this.comment != null) {
+			writer.print(" COMMENT ");
+			this.comment.unparse(writer, leftPrec, rightPrec);
+		}
+	}
+
+	public SqlIdentifier getName() {
+		return name;
+	}
+
+	public void setName(SqlIdentifier name) {
+		this.name = name;
+	}
+
+	public SqlDataTypeSpec getType() {
+		return type;
+	}
+
+	public void setType(SqlDataTypeSpec type) {
+		this.type = type;
+	}
+
+	public SqlCharStringLiteral getComment() {
+		return comment;
+	}
+
+	public void setComment(SqlCharStringLiteral comment) {
+		this.comment = comment;
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/error/SqlParseException.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/error/SqlParseException.java
new file mode 100644
index 0000000..365de20
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/error/SqlParseException.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.error;
+
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * SQL parse Exception. This is a simpler version
+ * of Calcite {@link org.apache.calcite.sql.parser.SqlParseException}
+ * which is used for SqlNode validation.
+ */
+public class SqlParseException extends Exception {
+
+	private SqlParserPos errorPosition;
+
+	private String message;
+
+	public SqlParseException(SqlParserPos errorPosition, String message) {
+		this.errorPosition = errorPosition;
+		this.message = message;
+	}
+
+	public SqlParseException(SqlParserPos errorPosition, String message, Exception e) {
+		super(e);
+		this.errorPosition = errorPosition;
+		this.message = message;
+	}
+
+	public String getMessage() {
+		return message;
+	}
+
+	public void setMessage(String message) {
+		this.message = message;
+	}
+
+	public SqlParserPos getErrorPosition() {
+		return errorPosition;
+	}
+
+	public void setErrorPosition(SqlParserPos errorPosition) {
+		this.errorPosition = errorPosition;
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/SqlTimeUnit.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/SqlTimeUnit.java
new file mode 100644
index 0000000..950399a
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/SqlTimeUnit.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.utils;
+
+import org.apache.calcite.sql.SqlWriter;
+
+/** SqlTimeUnit used for Flink DDL sql. **/
+public enum SqlTimeUnit {
+	DAY("DAY", 24 * 3600 * 1000),
+	HOUR("HOUR", 3600 * 1000),
+	MINUTE("MINUTE", 60 * 1000),
+	SECOND("SECOND", 1000),
+	MILLISECOND("MILLISECOND", 1);
+
+	/** Unparsing keyword. */
+	private String keyword;
+	/** Times used to transform this time unit to millisecond. **/
+	private long timeToMillisecond;
+
+	SqlTimeUnit(String keyword, long timeToMillisecond) {
+		this.keyword = keyword;
+		this.timeToMillisecond = timeToMillisecond;
+	}
+
+	public long populateAsMillisecond(int timeInterval) {
+		return timeToMillisecond * timeInterval;
+	}
+
+	public void unparse(SqlWriter writer) {
+		writer.keyword(keyword);
+	}
+
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
new file mode 100644
index 0000000..d6d88be
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.error.SqlParseException;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParserImplFactory;
+import org.apache.calcite.sql.parser.SqlParserTest;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+/** FlinkSqlParserImpl tests. **/
+public class FlinkSqlParserImplTest extends SqlParserTest {
+
+	protected SqlParserImplFactory parserImplFactory() {
+		return FlinkSqlParserImpl.FACTORY;
+	}
+
+	@Test
+	public void testCreateTable() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  h varchar, \n" +
+				"  g as 2 * (a + 1), \n" +
+				"  ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+				"  b varchar,\n" +
+				"  proc as PROCTIME(), \n" +
+				"  PRIMARY KEY (a, b)\n" +
+				")\n" +
+				"PARTITIONED BY (a, h)\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `TBL1` (\n" +
+				"  `A`  BIGINT,\n" +
+				"  `H`  VARCHAR,\n" +
+				"  `G` AS (2 * (`A` + 1)),\n" +
+				"  `TS` AS `TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+				"  `B`  VARCHAR,\n" +
+				"  `PROC` AS `PROCTIME`(),\n" +
+				"  PRIMARY KEY (`A`, `B`)\n" +
+				")\n" +
+				"PARTITIONED BY (`A`, `H`)\n" +
+				"WITH (\n" +
+				"  `CONNECTOR` = 'kafka',\n" +
+				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithComment() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint comment 'test column comment AAA.',\n" +
+				"  h varchar, \n" +
+				"  g as 2 * (a + 1), \n" +
+				"  ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+				"  b varchar,\n" +
+				"  proc as PROCTIME(), \n" +
+				"  PRIMARY KEY (a, b)\n" +
+				")\n" +
+				"comment 'test table comment ABC.'\n" +
+				"PARTITIONED BY (a, h)\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `TBL1` (\n" +
+				"  `A`  BIGINT  COMMENT 'test column comment AAA.',\n" +
+				"  `H`  VARCHAR,\n" +
+				"  `G` AS (2 * (`A` + 1)),\n" +
+				"  `TS` AS `TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+				"  `B`  VARCHAR,\n" +
+				"  `PROC` AS `PROCTIME`(),\n" +
+				"  PRIMARY KEY (`A`, `B`)\n" +
+				")\n" +
+				"COMMENT 'test table comment ABC.'\n" +
+				"PARTITIONED BY (`A`, `H`)\n" +
+				"WITH (\n" +
+				"  `CONNECTOR` = 'kafka',\n" +
+				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithPrimaryKeyAndUniqueKey() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint comment 'test column comment AAA.',\n" +
+				"  h varchar, \n" +
+				"  g as 2 * (a + 1), \n" +
+				"  ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+				"  b varchar,\n" +
+				"  proc as PROCTIME(), \n" +
+				"  PRIMARY KEY (a, b), \n" +
+				"  UNIQUE (h, g)\n" +
+				")\n" +
+				"comment 'test table comment ABC.'\n" +
+				"PARTITIONED BY (a, h)\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `TBL1` (\n" +
+				"  `A`  BIGINT  COMMENT 'test column comment AAA.',\n" +
+				"  `H`  VARCHAR,\n" +
+				"  `G` AS (2 * (`A` + 1)),\n" +
+				"  `TS` AS `TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+				"  `B`  VARCHAR,\n" +
+				"  `PROC` AS `PROCTIME`(),\n" +
+				"  PRIMARY KEY (`A`, `B`),\n" +
+				"  UNIQUE (`H`, `G`)\n" +
+				")\n" +
+				"COMMENT 'test table comment ABC.'\n" +
+				"PARTITIONED BY (`A`, `H`)\n" +
+				"WITH (\n" +
+				"  `CONNECTOR` = 'kafka',\n" +
+				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				")");
+	}
+
+	@Ignore // need to implement
+	@Test
+	public void testCreateTableWithoutWatermarkFieldName() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `TBL1` (\n" +
+				"  `A`  BIGINT,\n" +
+				"  `B`  VARCHAR,\n" +
+				"  `C` AS (2 * (`A` + 1)),\n" +
+				"  WATERMARK FOR `A` AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
+				") WITH (\n" +
+				"  `CONNECTOR` = 'kafka',\n" +
+				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				")");
+	}
+
+	@Ignore // need to implement
+	@Test
+	public void testCreateTableWithWatermarkBoundedDelay() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 DAY\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `TBL1` (\n" +
+				"  `A`  BIGINT,\n" +
+				"  `B`  VARCHAR,\n" +
+				"  `C` AS (2 * (`A` + 1)),\n" +
+				"  WATERMARK `WK` FOR `A` AS BOUNDED WITH DELAY 1000 DAY\n" +
+				") WITH (\n" +
+				"  `CONNECTOR` = 'kafka',\n" +
+				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				")");
+	}
+
+	@Ignore // need to implement
+	@Test
+	public void testCreateTableWithWatermarkBoundedDelay1() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 HOUR\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `TBL1` (\n" +
+				"  `A`  BIGINT,\n" +
+				"  `B`  VARCHAR,\n" +
+				"  `C` AS (2 * (`A` + 1)),\n" +
+				"  WATERMARK `WK` FOR `A` AS BOUNDED WITH DELAY 1000 HOUR\n" +
+				") WITH (\n" +
+				"  `CONNECTOR` = 'kafka',\n" +
+				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				")");
+	}
+
+	@Ignore // need to implement
+	@Test
+	public void testCreateTableWithWatermarkBoundedDelay2() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 MINUTE\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `TBL1` (\n" +
+				"  `A`  BIGINT,\n" +
+				"  `B`  VARCHAR,\n" +
+				"  `C` AS (2 * (`A` + 1)),\n" +
+				"  WATERMARK `WK` FOR `A` AS BOUNDED WITH DELAY 1000 MINUTE\n" +
+				") WITH (\n" +
+				"  `CONNECTOR` = 'kafka',\n" +
+				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				")");
+	}
+
+	@Ignore // need to implement
+	@Test
+	public void testCreateTableWithWatermarkBoundedDelay3() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 SECOND\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `TBL1` (\n" +
+				"  `A`  BIGINT,\n" +
+				"  `B`  VARCHAR,\n" +
+				"  `C` AS (2 * (`A` + 1)),\n" +
+				"  WATERMARK `WK` FOR `A` AS BOUNDED WITH DELAY 1000 SECOND\n" +
+				") WITH (\n" +
+				"  `CONNECTOR` = 'kafka',\n" +
+				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				")");
+	}
+
+	@Ignore // need to implement
+	@Test
+	public void testCreateTableWithNegativeWatermarkOffsetDelay() {
+		checkFails("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS BOUNDED WITH DELAY ^-^1000 SECOND\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"(?s).*Encountered \"-\" at line 5, column 44.\n" +
+				"Was expecting:\n" +
+				"    <UNSIGNED_INTEGER_LITERAL> ...\n" +
+				".*");
+	}
+
+	@Ignore // need to implement
+	@Test
+	public void testCreateTableWithWatermarkStrategyAscending() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS ASCENDING\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `TBL1` (\n" +
+				"  `A`  BIGINT,\n" +
+				"  `B`  VARCHAR,\n" +
+				"  `C` AS (2 * (`A` + 1)),\n" +
+				"  WATERMARK `WK` FOR `A` AS ASCENDING\n" +
+				") WITH (\n" +
+				"  `CONNECTOR` = 'kafka',\n" +
+				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				")");
+	}
+
+	@Ignore // need to implement
+	@Test
+	public void testCreateTableWithWatermarkStrategyFromSource() {
+		check("CREATE TABLE tbl1 (\n" +
+				"  a bigint,\n" +
+				"  b varchar, \n" +
+				"  c as 2 * (a + 1), \n" +
+				"  WATERMARK wk FOR a AS FROM_SOURCE\n" +
+				")\n" +
+				"  with (\n" +
+				"    connector = 'kafka', \n" +
+				"    kafka.topic = 'log.test'\n" +
+				")\n",
+			"CREATE TABLE `TBL1` (\n" +
+				"  `A`  BIGINT,\n" +
+				"  `B`  VARCHAR,\n" +
+				"  `C` AS (2 * (`A` + 1)),\n" +
+				"  WATERMARK `WK` FOR `A` AS FROM_SOURCE\n" +
+				") WITH (\n" +
+				"  `CONNECTOR` = 'kafka',\n" +
+				"  `KAFKA`.`TOPIC` = 'log.test'\n" +
+				")");
+	}
+
+	@Test
+	public void testCreateTableWithComplexType() {
+		check("CREATE TABLE tbl1 (\n" +
+			"  a ARRAY<bigint>, \n" +
+			"  b MAP<int, varchar>,\n" +
+			"  c ROW<cc0:int, cc1: float, cc2: varchar>,\n" +
+			"  PRIMARY KEY (a, b) \n" +
+			") with (\n" +
+			"  x = 'y', \n" +
+			"  asd = 'data'\n" +
+			")\n", "CREATE TABLE `TBL1` (\n" +
+			"  `A`  ARRAY< BIGINT >,\n" +
+			"  `B`  MAP< INTEGER, VARCHAR >,\n" +
+			"  `C`  ROW< `CC0` : INTEGER, `CC1` : FLOAT, `CC2` : VARCHAR >,\n" +
+			"  PRIMARY KEY (`A`, `B`)\n" +
+			") WITH (\n" +
+			"  `X` = 'y',\n" +
+			"  `ASD` = 'data'\n" +
+			")");
+	}
+
+	@Test
+	public void testCreateTableWithDecimalType() {
+		check("CREATE TABLE tbl1 (\n" +
+			"  a decimal, \n" +
+			"  b decimal(10, 0),\n" +
+			"  c decimal(38, 38),\n" +
+			"  PRIMARY KEY (a, b) \n" +
+			") with (\n" +
+			"  x = 'y', \n" +
+			"  asd = 'data'\n" +
+			")\n", "CREATE TABLE `TBL1` (\n" +
+			"  `A`  DECIMAL,\n" +
+			"  `B`  DECIMAL(10, 0),\n" +
+			"  `C`  DECIMAL(38, 38),\n" +
+			"  PRIMARY KEY (`A`, `B`)\n" +
+			") WITH (\n" +
+			"  `X` = 'y',\n" +
+			"  `ASD` = 'data'\n" +
+			")");
+	}
+
+	@Test
+	public void testCreateTableWithNestedComplexType() {
+		check("CREATE TABLE tbl1 (\n" +
+			"  a ARRAY<ARRAY<bigint>>, \n" +
+			"  b MAP<MAP<int, varchar>, ARRAY<varchar>>,\n" +
+			"  c ROW<cc0:ARRAY<int>, cc1: float, cc2: varchar>,\n" +
+			"  PRIMARY KEY (a, b) \n" +
+			") with (\n" +
+			"  x = 'y', \n" +
+			"  asd = 'data'\n" +
+			")\n", "CREATE TABLE `TBL1` (\n" +
+			"  `A`  ARRAY< ARRAY< BIGINT > >,\n" +
+			"  `B`  MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > >,\n" +
+			"  `C`  ROW< `CC0` : ARRAY< INTEGER >, `CC1` : FLOAT, `CC2` : VARCHAR >,\n" +
+			"  PRIMARY KEY (`A`, `B`)\n" +
+			") WITH (\n" +
+			"  `X` = 'y',\n" +
+			"  `ASD` = 'data'\n" +
+			")");
+	}
+
+	@Test
+	public void testInvalidComputedColumn() {
+		checkFails("CREATE TABLE sls_stream (\n" +
+			"  a bigint, \n" +
+			"  b varchar,\n" +
+			"  ^toTimestamp^(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+			"  PRIMARY KEY (a, b) \n" +
+			") with (\n" +
+			"  x = 'y', \n" +
+			"  asd = 'data'\n" +
+			")\n", "(?s).*Encountered \"toTimestamp \\(\" at line 4, column 3.\n" +
+			"Was expecting one of:\n" +
+			"    <IDENTIFIER> \"CHARACTER\" ...\n" +
+			"    <IDENTIFIER> \"CHAR\" ...\n" +
+			".*");
+	}
+
+	@Test
+	public void testColumnSqlString() {
+		String sql = "CREATE TABLE sls_stream (\n" +
+			"  a bigint, \n" +
+			"  f as a + 1, \n" +
+			"  b varchar,\n" +
+			"  ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+			"  proc as PROCTIME(),\n" +
+			"  c int,\n" +
+			"  PRIMARY KEY (a, b) \n" +
+			") with (\n" +
+			"  x = 'y', \n" +
+			"  asd = 'data'\n" +
+			")\n";
+		String expected = "`A`, (`A` + 1) AS `F`, `B`, "
+			+ "`TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss') AS `TS`, "
+			+ "`PROCTIME`() AS `PROC`, `C`";
+		sql(sql).node(new ValidationMatcher()
+			.expectColumnSql(expected));
+	}
+
+	@Test
+	public void testCreateInvalidPartitionedTable() {
+		String sql = "create table sls_stream1(\n" +
+			"  a bigint,\n" +
+			"  b VARCHAR,\n" +
+			"  PRIMARY KEY(a, b)\n" +
+			") PARTITIONED BY (\n" +
+			"  c,\n" +
+			"  d\n" +
+			") with ( x = 'y', asd = 'dada')";
+		sql(sql).node(new ValidationMatcher()
+			.fails("Partition column [C] not defined in columns, at line 6, column 3"));
+
+	}
+
+	@Test
+	public void testDropTable() {
+		String sql = "DROP table catalog1.db1.tbl1";
+		check(sql, "DROP TABLE `CATALOG1`.`DB1`.`TBL1`");
+	}
+
+	@Test
+	public void testDropIfExists() {
+		String sql = "DROP table IF EXISTS catalog1.db1.tbl1";
+		check(sql, "DROP TABLE IF EXISTS `CATALOG1`.`DB1`.`TBL1`");
+	}
+
+	/** Matcher that invokes the #validate() of the produced SqlNode. **/
+	private static class ValidationMatcher extends BaseMatcher<SqlNode> {
+		private String expectedColumnSql;
+		private String failMsg;
+
+		public ValidationMatcher expectColumnSql(String s) {
+			this.expectedColumnSql =  s;
+			return this;
+		}
+
+		public ValidationMatcher fails(String failMsg) {
+			this.failMsg = failMsg;
+			return this;
+		}
+
+		@Override
+		public void describeTo(Description description) {
+			description.appendText("test");
+		}
+
+		@Override
+		public boolean matches(Object item) {
+			if (item instanceof SqlCreateTable) {
+				SqlCreateTable createTable = (SqlCreateTable) item;
+				try {
+					createTable.validate();
+				} catch (SqlParseException e) {
+					assertEquals(failMsg, e.getMessage());
+				}
+				if (expectedColumnSql != null) {
+					assertEquals(expectedColumnSql, createTable.getColumnSqlString());
+				}
+				return true;
+			} else {
+				return false;
+			}
+		}
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java
new file mode 100644
index 0000000..ce3ac2d
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+/**
+ * Extension to {@link FlinkSqlParserImplTest} that ensures that every expression can
+ * un-parse successfully.
+ */
+public class FlinkSqlUnParserTest extends FlinkSqlParserImplTest {
+	//~ Constructors -----------------------------------------------------------
+
+	public FlinkSqlUnParserTest() {
+	}
+
+	//~ Methods ----------------------------------------------------------------
+
+	@Override
+	protected boolean isUnparserTest() {
+		return true;
+	}
+
+	@Override
+	protected Tester getTester() {
+		return new UnparsingTesterImpl();
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml
index f960b47..7715207 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -102,6 +102,12 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-sql-parser</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-scala_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatement.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatement.java
new file mode 100644
index 0000000..7ac3106
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatement.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
+
+/**
+ * Mix-in tool class for {@code SqlNode} that allows DDL commands to be
+ * executed directly.
+ *
+ * <p>For every kind of {@link SqlNode}, there needs to have a corresponding
+ * method #execute(type) method, the 'type' argument should be the subclass
+ * type for the supported {@link SqlNode}.
+ */
+public class SqlExecutableStatement implements ReflectiveVisitor {
+	private TableEnvironment tableEnv;
+
+	private final ReflectUtil.MethodDispatcher<Void> dispatcher =
+		ReflectUtil.createMethodDispatcher(Void.class,
+			this,
+			"execute",
+			SqlNode.class);
+
+	//~ Constructors -----------------------------------------------------------
+
+	private SqlExecutableStatement(TableEnvironment tableEnvironment) {
+		this.tableEnv = tableEnvironment;
+	}
+
+	/**
+	 * This is the main entrance of executing all kinds of DDL/DML {@code SqlNode}s, different
+	 * SqlNode will have it's implementation in the #execute(type) method whose 'type' argument
+	 * is subclass of {@code SqlNode}.
+	 *
+	 * <p>Caution that the {@link #execute(SqlNode)} should never expect to be invoked.
+	 *
+	 * @param tableEnvironment TableEnvironment to interact with
+	 * @param sqlNode          SqlNode to execute on
+	 */
+	public static void executeSqlNode(TableEnvironment tableEnvironment, SqlNode sqlNode) {
+		SqlExecutableStatement statement = new SqlExecutableStatement(tableEnvironment);
+		statement.dispatcher.invoke(sqlNode);
+	}
+
+	/**
+	 * Execute the {@link SqlCreateTable} node.
+	 */
+	public void execute(SqlCreateTable sqlCreateTable) {
+		// need to implement.
+	}
+
+	/** Fallback method to throw exception. */
+	public void execute(SqlNode node) {
+		throw new TableException("Should not invoke to node type "
+			+ node.getClass().getSimpleName());
+	}
+}
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index 5213ea5..57918da 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -43,6 +43,7 @@ under the License.
 		<module>flink-table-runtime-blink</module>
 		<module>flink-table-uber</module>
 		<module>flink-sql-client</module>
+		<module>flink-sql-parser</module>
 	</modules>
 
 	<properties>