You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/31 09:28:51 UTC
[flink] branch master updated: [FLINK-6962][table] Add sql parser
module and support CREATE / DROP table
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3710bcb [FLINK-6962][table] Add sql parser module and support CREATE / DROP table
3710bcb is described below
commit 3710bcbc195f9a6304bfb452a58bb4a96804cc24
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri May 31 17:28:39 2019 +0800
[FLINK-6962][table] Add sql parser module and support CREATE / DROP table
This closes #8548
---
flink-table/flink-sql-parser/pom.xml | 297 ++++++++++++
.../flink-sql-parser/src/main/codegen/config.fmpp | 41 ++
.../src/main/codegen/data/Parser.tdd | 430 ++++++++++++++++++
.../main/codegen/includes/compoundIdentifier.ftl | 34 ++
.../src/main/codegen/includes/parserImpls.ftl | 289 ++++++++++++
.../org/apache/flink/sql/parser/SqlProperty.java | 91 ++++
.../flink/sql/parser/ddl/ExtendedSqlType.java | 42 ++
.../apache/flink/sql/parser/ddl/SqlArrayType.java | 49 ++
.../apache/flink/sql/parser/ddl/SqlColumnType.java | 62 +++
.../flink/sql/parser/ddl/SqlCreateTable.java | 320 +++++++++++++
.../apache/flink/sql/parser/ddl/SqlDropTable.java | 87 ++++
.../apache/flink/sql/parser/ddl/SqlMapType.java | 57 +++
.../apache/flink/sql/parser/ddl/SqlRowType.java | 78 ++++
.../flink/sql/parser/ddl/SqlTableColumn.java | 97 ++++
.../flink/sql/parser/error/SqlParseException.java | 60 +++
.../apache/flink/sql/parser/utils/SqlTimeUnit.java | 49 ++
.../flink/sql/parser/FlinkSqlParserImplTest.java | 498 +++++++++++++++++++++
.../flink/sql/parser/FlinkSqlUnParserTest.java | 42 ++
flink-table/flink-table-planner-blink/pom.xml | 6 +
.../table/sqlexec/SqlExecutableStatement.java | 79 ++++
flink-table/pom.xml | 1 +
21 files changed, 2709 insertions(+)
diff --git a/flink-table/flink-sql-parser/pom.xml b/flink-table/flink-sql-parser/pom.xml
new file mode 100644
index 0000000..a4003cc
--- /dev/null
+++ b/flink-table/flink-sql-parser/pom.xml
@@ -0,0 +1,297 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-table</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.9-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-sql-parser</artifactId>
+ <name>flink-sql-parser</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <!-- override parent pom -->
+ <test.excludedGroups/>
+ <calcite.version>1.19.0</calcite.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <!-- When updating the Calcite version, make sure to update the dependency exclusions -->
+ <version>${calcite.version}</version>
+ <exclusions>
+ <!--
+
+ Dependencies that are not needed for how we use Calcite right now.
+
+ "mvn dependency:tree" as of Calcite 1.19:
+
+ [INFO] +- org.apache.calcite:calcite-core:jar:1.19.0:compile
+ [INFO] | +- org.apache.calcite.avatica:avatica-core:jar:1.13.0:compile
+ [INFO] | \- com.google.guava:guava:jar:19.0:compile
+
+ -->
+ <exclusion>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica-metrics</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-dbcp2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.esri.geometry</groupId>
+ <artifactId>esri-geometry-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.yahoo.datasketches</groupId>
+ <artifactId>sketches-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.hydromatic</groupId>
+ <artifactId>aggdesigner-algorithm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-linq4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>commons-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <version>${calcite.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <!-- Extract parser grammar template from calcite-core.jar and put
+ it under ${project.build.directory} where all freemarker templates are. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <id>unpack-parser-template</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+ <outputDirectory>${project.build.directory}/</outputDirectory>
+ <includes>**/Parser.jj</includes>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- adding fmpp code gen -->
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-fmpp-resources</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/codegen</outputDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/codegen</directory>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>com.googlecode.fmpp-maven-plugin</groupId>
+ <artifactId>fmpp-maven-plugin</artifactId>
+ <version>1.0</version>
+ <dependencies>
+ <dependency>
+ <groupId>org.freemarker</groupId>
+ <artifactId>freemarker</artifactId>
+ <version>2.3.25-incubating</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <id>generate-fmpp-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ <configuration>
+ <cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
+ <outputDirectory>target/generated-sources</outputDirectory>
+ <templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-generated-sources</id>
+ <phase>process-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javacc-maven-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <id>javacc</id>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
+ <includes>
+ <include>**/Parser.jj</include>
+ </includes>
+ <!-- This must be kept synced with Apache Calcite. -->
+ <lookAhead>2</lookAhead>
+ <isStatic>false</isStatic>
+ <outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flink-table/flink-sql-parser/src/main/codegen/config.fmpp b/flink-table/flink-sql-parser/src/main/codegen/config.fmpp
new file mode 100644
index 0000000..1d5c8e7
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/codegen/config.fmpp
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is an FMPP (http://fmpp.sourceforge.net/) configuration file to
+# allow clients to extend Calcite's SQL parser to support application specific
+# SQL statements, literals or data types.
+#
+# Calcite's parser grammar file (Parser.jj) is written in javacc
+# (https://javacc.org/) with Freemarker (http://freemarker.org/) variables
+# to allow clients to:
+# 1. have custom parser implementation class and package name.
+# 2. insert new parser method implementations written in javacc to parse
+# custom:
+# a) SQL statements.
+# b) literals.
+# c) data types.
+# 3. add new keywords to support custom SQL constructs added as part of (2).
+# 4. add import statements needed by inserted custom parser implementations.
+#
+# Parser template file (Parser.jj) along with this file are packaged as
+# part of the calcite-core-<version>.jar under "codegen" directory.
+
+data: {
+ parser: tdd(../data/Parser.tdd)
+}
+
+freemarkerLinks: {
+ includes: includes/
+}
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
new file mode 100644
index 0000000..c026d8a
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -0,0 +1,430 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+ # Generated parser implementation package and class name.
+ package: "org.apache.flink.sql.parser.impl",
+ class: "FlinkSqlParserImpl",
+
+ # List of additional classes and packages to import.
+ # Example. "org.apache.calcite.sql.*", "java.util.List".
+ imports: [
+ "org.apache.flink.sql.parser.ddl.SqlCreateTable",
+ "org.apache.flink.sql.parser.ddl.SqlDropTable"
+ "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext",
+ "org.apache.flink.sql.parser.ddl.SqlTableColumn",
+ "org.apache.flink.sql.parser.ddl.SqlArrayType",
+ "org.apache.flink.sql.parser.ddl.SqlMapType",
+ "org.apache.flink.sql.parser.ddl.SqlRowType",
+ "org.apache.flink.sql.parser.utils.SqlTimeUnit",
+ "org.apache.flink.sql.parser.SqlProperty",
+ "org.apache.calcite.sql.SqlDrop",
+ "org.apache.calcite.sql.SqlCreate",
+ "java.util.List",
+ "java.util.ArrayList"
+ ]
+
+ # List of new keywords. Example: "DATABASES", "TABLES". If the keyword is not a reserved
+ # keyword, please also add it to 'nonReservedKeywords' section.
+ keywords: [
+ "COMMENT",
+ "PARTITIONED",
+ "IF",
+ "WATERMARK",
+ "ASCENDING",
+ "FROM_SOURCE",
+ "BOUNDED",
+ "DELAY"
+ ]
+
+ # List of keywords from "keywords" section that are not reserved.
+ nonReservedKeywords: [
+ "A"
+ "ABSENT"
+ "ABSOLUTE"
+ "ACTION"
+ "ADA"
+ "ADD"
+ "ADMIN"
+ "AFTER"
+ "ALWAYS"
+ "APPLY"
+ "ASC"
+ "ASSERTION"
+ "ASSIGNMENT"
+ "ATTRIBUTE"
+ "ATTRIBUTES"
+ "BEFORE"
+ "BERNOULLI"
+ "BREADTH"
+ "C"
+ "CASCADE"
+ "CATALOG"
+ "CATALOG_NAME"
+ "CENTURY"
+ "CHAIN"
+ "CHARACTER_SET_CATALOG"
+ "CHARACTER_SET_NAME"
+ "CHARACTER_SET_SCHEMA"
+ "CHARACTERISTICS"
+ "CHARACTERS"
+ "CLASS_ORIGIN"
+ "COBOL"
+ "COLLATION"
+ "COLLATION_CATALOG"
+ "COLLATION_NAME"
+ "COLLATION_SCHEMA"
+ "COLUMN_NAME"
+ "COMMAND_FUNCTION"
+ "COMMAND_FUNCTION_CODE"
+ "COMMITTED"
+ "CONDITION_NUMBER"
+ "CONDITIONAL"
+ "CONNECTION"
+ "CONNECTION_NAME"
+ "CONSTRAINT_CATALOG"
+ "CONSTRAINT_NAME"
+ "CONSTRAINT_SCHEMA"
+ "CONSTRAINTS"
+ "CONSTRUCTOR"
+ "CONTINUE"
+ "CURSOR_NAME"
+ "DATA"
+ "DATABASE"
+ "DATETIME_INTERVAL_CODE"
+ "DATETIME_INTERVAL_PRECISION"
+ "DECADE"
+ "DEFAULTS"
+ "DEFERRABLE"
+ "DEFERRED"
+ "DEFINED"
+ "DEFINER"
+ "DEGREE"
+ "DEPTH"
+ "DERIVED"
+ "DESC"
+ "DESCRIPTION"
+ "DESCRIPTOR"
+ "DIAGNOSTICS"
+ "DISPATCH"
+ "DOMAIN"
+ "DOW"
+ "DOY"
+ "DYNAMIC_FUNCTION"
+ "DYNAMIC_FUNCTION_CODE"
+ "ENCODING"
+ "EPOCH"
+ "ERROR"
+ "EXCEPTION"
+ "EXCLUDE"
+ "EXCLUDING"
+ "FINAL"
+ "FIRST"
+ "FOLLOWING"
+ "FORMAT"
+ "FORTRAN"
+ "FOUND"
+ "FRAC_SECOND"
+ "G"
+ "GENERAL"
+ "GENERATED"
+ "GEOMETRY"
+ "GO"
+ "GOTO"
+ "GRANTED"
+ "HIERARCHY"
+ "IMMEDIATE"
+ "IMMEDIATELY"
+ "IMPLEMENTATION"
+ "INCLUDING"
+ "INCREMENT"
+ "INITIALLY"
+ "INPUT"
+ "INSTANCE"
+ "INSTANTIABLE"
+ "INVOKER"
+ "ISODOW"
+ "ISOYEAR"
+ "ISOLATION"
+ "JAVA"
+ "JSON"
+ "JSON_TYPE"
+ "JSON_DEPTH"
+ "JSON_PRETTY"
+ "K"
+ "KEY"
+ "KEY_MEMBER"
+ "KEY_TYPE"
+ "LABEL"
+ "LAST"
+ "LENGTH"
+ "LEVEL"
+ "LIBRARY"
+ "LOCATOR"
+ "M"
+ "MAP"
+ "MATCHED"
+ "MAXVALUE"
+ "MICROSECOND"
+ "MESSAGE_LENGTH"
+ "MESSAGE_OCTET_LENGTH"
+ "MESSAGE_TEXT"
+ "MILLISECOND"
+ "MILLENNIUM"
+ "MINVALUE"
+ "MORE_"
+ "MUMPS"
+ "NAME"
+ "NAMES"
+ "NANOSECOND"
+ "NESTING"
+ "NORMALIZED"
+ "NULLABLE"
+ "NULLS"
+ "NUMBER"
+ "OBJECT"
+ "OCTETS"
+ "OPTION"
+ "OPTIONS"
+ "ORDERING"
+ "ORDINALITY"
+ "OTHERS"
+ "OUTPUT"
+ "OVERRIDING"
+ "PAD"
+ "PARAMETER_MODE"
+ "PARAMETER_NAME"
+ "PARAMETER_ORDINAL_POSITION"
+ "PARAMETER_SPECIFIC_CATALOG"
+ "PARAMETER_SPECIFIC_NAME"
+ "PARAMETER_SPECIFIC_SCHEMA"
+ "PARTIAL"
+ "PASCAL"
+ "PASSING"
+ "PASSTHROUGH"
+ "PAST"
+ "PATH"
+ "PLACING"
+ "PLAN"
+ "PLI"
+ "PRECEDING"
+ "PRESERVE"
+ "PRIOR"
+ "PRIVILEGES"
+ "PUBLIC"
+ "QUARTER"
+ "READ"
+ "RELATIVE"
+ "REPEATABLE"
+ "REPLACE"
+ "RESTART"
+ "RESTRICT"
+ "RETURNED_CARDINALITY"
+ "RETURNED_LENGTH"
+ "RETURNED_OCTET_LENGTH"
+ "RETURNED_SQLSTATE"
+ "RETURNING"
+ "ROLE"
+ "ROUTINE"
+ "ROUTINE_CATALOG"
+ "ROUTINE_NAME"
+ "ROUTINE_SCHEMA"
+ "ROW_COUNT"
+ "SCALAR"
+ "SCALE"
+ "SCHEMA"
+ "SCHEMA_NAME"
+ "SCOPE_CATALOGS"
+ "SCOPE_NAME"
+ "SCOPE_SCHEMA"
+ "SECTION"
+ "SECURITY"
+ "SELF"
+ "SEQUENCE"
+ "SERIALIZABLE"
+ "SERVER"
+ "SERVER_NAME"
+ "SESSION"
+ "SETS"
+ "SIMPLE"
+ "SIZE"
+ "SOURCE"
+ "SPACE"
+ "SPECIFIC_NAME"
+ "SQL_BIGINT"
+ "SQL_BINARY"
+ "SQL_BIT"
+ "SQL_BLOB"
+ "SQL_BOOLEAN"
+ "SQL_CHAR"
+ "SQL_CLOB"
+ "SQL_DATE"
+ "SQL_DECIMAL"
+ "SQL_DOUBLE"
+ "SQL_FLOAT"
+ "SQL_INTEGER"
+ "SQL_INTERVAL_DAY"
+ "SQL_INTERVAL_DAY_TO_HOUR"
+ "SQL_INTERVAL_DAY_TO_MINUTE"
+ "SQL_INTERVAL_DAY_TO_SECOND"
+ "SQL_INTERVAL_HOUR"
+ "SQL_INTERVAL_HOUR_TO_MINUTE"
+ "SQL_INTERVAL_HOUR_TO_SECOND"
+ "SQL_INTERVAL_MINUTE"
+ "SQL_INTERVAL_MINUTE_TO_SECOND"
+ "SQL_INTERVAL_MONTH"
+ "SQL_INTERVAL_SECOND"
+ "SQL_INTERVAL_YEAR"
+ "SQL_INTERVAL_YEAR_TO_MONTH"
+ "SQL_LONGVARBINARY"
+ "SQL_LONGVARNCHAR"
+ "SQL_LONGVARCHAR"
+ "SQL_NCHAR"
+ "SQL_NCLOB"
+ "SQL_NUMERIC"
+ "SQL_NVARCHAR"
+ "SQL_REAL"
+ "SQL_SMALLINT"
+ "SQL_TIME"
+ "SQL_TIMESTAMP"
+ "SQL_TINYINT"
+ "SQL_TSI_DAY"
+ "SQL_TSI_FRAC_SECOND"
+ "SQL_TSI_HOUR"
+ "SQL_TSI_MICROSECOND"
+ "SQL_TSI_MINUTE"
+ "SQL_TSI_MONTH"
+ "SQL_TSI_QUARTER"
+ "SQL_TSI_SECOND"
+ "SQL_TSI_WEEK"
+ "SQL_TSI_YEAR"
+ "SQL_VARBINARY"
+ "SQL_VARCHAR"
+ "STATE"
+ "STATEMENT"
+ "STRUCTURE"
+ "STYLE"
+ "SUBCLASS_ORIGIN"
+ "SUBSTITUTE"
+ "TABLE_NAME"
+ "TEMPORARY"
+ "TIES"
+ "TIMESTAMPADD"
+ "TIMESTAMPDIFF"
+ "TOP_LEVEL_COUNT"
+ "TRANSACTION"
+ "TRANSACTIONS_ACTIVE"
+ "TRANSACTIONS_COMMITTED"
+ "TRANSACTIONS_ROLLED_BACK"
+ "TRANSFORM"
+ "TRANSFORMS"
+ "TRIGGER_CATALOG"
+ "TRIGGER_NAME"
+ "TRIGGER_SCHEMA"
+ "TYPE"
+ "UNBOUNDED"
+ "UNCOMMITTED"
+ "UNCONDITIONAL"
+ "UNDER"
+ "UNNAMED"
+ "USAGE"
+ "USER_DEFINED_TYPE_CATALOG"
+ "USER_DEFINED_TYPE_CODE"
+ "USER_DEFINED_TYPE_NAME"
+ "USER_DEFINED_TYPE_SCHEMA"
+ "UTF8"
+ "UTF16"
+ "UTF32"
+ "VERSION"
+ "VIEW"
+ "WEEK"
+ "WRAPPER"
+ "WORK"
+ "WRITE"
+ "XML"
+ "ZONE"
+
+ # not in core, added in Flink
+ "PARTITIONED",
+ "IF",
+ "ASCENDING",
+ "FROM_SOURCE",
+ "BOUNDED",
+ "DELAY"
+ ]
+
+ # List of methods for parsing custom SQL statements.
+ # Return type of method implementation should be 'SqlNode'.
+ # Example: SqlShowDatabases(), SqlShowTables().
+ statementParserMethods: [
+ ]
+
+ # List of methods for parsing custom literals.
+ # Return type of method implementation should be "SqlNode".
+ # Example: ParseJsonLiteral().
+ literalParserMethods: [
+ ]
+
+ # List of methods for parsing custom data types.
+ # Return type of method implementation should be "SqlIdentifier".
+ # Example: SqlParseTimeStampZ().
+ dataTypeParserMethods: [
+ "SqlArrayType()",
+ "SqlMapType()",
+ "SqlRowType()"
+ ]
+
+ # List of methods for parsing builtin function calls.
+ # Return type of method implementation should be "SqlNode".
+ # Example: DateFunctionCall().
+ builtinFunctionCallMethods: [
+ ]
+
+ # List of methods for parsing extensions to "ALTER <scope>" calls.
+ # Each must accept arguments "(SqlParserPos pos, String scope)".
+ # Example: "SqlUploadJarNode"
+ alterStatementParserMethods: [
+ ]
+
+ # List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
+ # Each must accept arguments "(SqlParserPos pos, boolean replace)".
+ createStatementParserMethods: [
+ "SqlCreateTable"
+ ]
+
+ # List of methods for parsing extensions to "DROP" calls.
+ # Each must accept arguments "(Span s)".
+ dropStatementParserMethods: [
+ "SqlDropTable"
+ ]
+
+ # List of files in @includes directory that have parser method
+ # implementations for parsing custom SQL statements, literals or types
+ # given as part of "statementParserMethods", "literalParserMethods" or
+ # "dataTypeParserMethods".
+ implementationFiles: [
+ "parserImpls.ftl"
+ ]
+
+ # List of additional join types. Each is a method with no arguments.
+ # Example: LeftSemiJoin()
+ joinTypes: [
+ ]
+
+ includeCompoundIdentifier: true
+ includeBraces: true
+ includeAdditionalDeclarations: false
+}
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/compoundIdentifier.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/compoundIdentifier.ftl
new file mode 100644
index 0000000..70db3c2
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/compoundIdentifier.ftl
@@ -0,0 +1,34 @@
+<#--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to you under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+-->
+
+<#--
+ Add implementations of additional parser statements, literals or
+ data types.
+
+ Example of SqlShowTables() implementation:
+ SqlNode SqlShowTables()
+ {
+ ...local variables...
+ }
+ {
+ <SHOW> <TABLES>
+ ...
+ {
+ return SqlShowTables(...)
+ }
+ }
+-->
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
new file mode 100644
index 0000000..92607d5
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -0,0 +1,289 @@
+<#--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to you under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+-->
+
+void TableColumn(TableCreationContext context) :
+{
+}
+{
+ (
+ TableColumn2(context.columnList)
+ |
+ context.primaryKeyList = PrimaryKey()
+ |
+ UniqueKey(context.uniqueKeysList)
+ |
+ ComputedColumn(context)
+ )
+}
+
+void ComputedColumn(TableCreationContext context) :
+{
+ SqlNode identifier;
+ SqlNode expr;
+ boolean hidden = false;
+ SqlParserPos pos;
+}
+{
+ identifier = SimpleIdentifier() {pos = getPos();}
+ <AS>
+ expr = Expression(ExprContext.ACCEPT_SUB_QUERY) {
+ expr = SqlStdOperatorTable.AS.createCall(Span.of(identifier, expr).pos(), expr, identifier);
+ context.columnList.add(expr);
+ }
+}
+
+void TableColumn2(List<SqlNode> list) :
+{
+ SqlParserPos pos;
+ SqlIdentifier name;
+ SqlDataTypeSpec type;
+ SqlCharStringLiteral comment = null;
+}
+{
+ name = SimpleIdentifier()
+ type = DataType()
+ (
+ <NULL> { type = type.withNullable(true); }
+ |
+ <NOT> <NULL> { type = type.withNullable(false); }
+ |
+ { type = type.withNullable(true); }
+ )
+ [ <COMMENT> <QUOTED_STRING> {
+ String p = SqlParserUtil.parseString(token.image);
+ comment = SqlLiteral.createCharString(p, getPos());
+ }]
+ {
+ SqlTableColumn tableColumn = new SqlTableColumn(name, type, comment, getPos());
+ list.add(tableColumn);
+ }
+}
+
+SqlNodeList PrimaryKey() :
+{
+ List<SqlNode> pkList = new ArrayList<SqlNode>();
+
+ SqlParserPos pos;
+ SqlIdentifier columnName;
+}
+{
+ <PRIMARY> { pos = getPos(); } <KEY> <LPAREN>
+ columnName = SimpleIdentifier() { pkList.add(columnName); }
+ (<COMMA> columnName = SimpleIdentifier() { pkList.add(columnName); })*
+ <RPAREN>
+ {
+ return new SqlNodeList(pkList, pos.plus(getPos()));
+ }
+}
+
+void UniqueKey(List<SqlNodeList> list) :
+{
+ List<SqlNode> ukList = new ArrayList<SqlNode>();
+ SqlParserPos pos;
+ SqlIdentifier columnName;
+}
+{
+ <UNIQUE> { pos = getPos(); } <LPAREN>
+ columnName = SimpleIdentifier() { ukList.add(columnName); }
+ (<COMMA> columnName = SimpleIdentifier() { ukList.add(columnName); })*
+ <RPAREN>
+ {
+ SqlNodeList uk = new SqlNodeList(ukList, pos.plus(getPos()));
+ list.add(uk);
+ }
+}
+
+SqlNode PropertyValue() :
+{
+ SqlIdentifier key;
+ SqlNode value;
+ SqlParserPos pos;
+}
+{
+ key = CompoundIdentifier()
+ { pos = getPos(); }
+ <EQ> value = StringLiteral()
+ {
+ return new SqlProperty(key, value, getPos());
+ }
+}
+
+SqlCreate SqlCreateTable(Span s, boolean replace) :
+{
+ final SqlParserPos startPos = s.pos();
+ SqlIdentifier tableName;
+ SqlNodeList primaryKeyList = null;
+ List<SqlNodeList> uniqueKeysList = null;
+ SqlNodeList columnList = SqlNodeList.EMPTY;
+ SqlCharStringLiteral comment = null;
+
+ SqlNodeList propertyList = null;
+ SqlNodeList partitionColumns = null;
+ SqlParserPos pos = startPos;
+}
+{
+ <TABLE>
+
+ tableName = CompoundIdentifier()
+ [
+ <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
+ TableColumn(ctx)
+ (
+ <COMMA> TableColumn(ctx)
+ )*
+ {
+ pos = pos.plus(getPos());
+ columnList = new SqlNodeList(ctx.columnList, pos);
+ primaryKeyList = ctx.primaryKeyList;
+ uniqueKeysList = ctx.uniqueKeysList;
+ }
+ <RPAREN>
+ ]
+ [ <COMMENT> <QUOTED_STRING> {
+ String p = SqlParserUtil.parseString(token.image);
+ comment = SqlLiteral.createCharString(p, getPos());
+ }]
+ [
+ <PARTITIONED> <BY>
+ {
+ SqlNode column;
+ List<SqlNode> partitionKey = new ArrayList<SqlNode>();
+ pos = getPos();
+
+ }
+ <LPAREN>
+ [
+ column = SimpleIdentifier()
+ {
+ partitionKey.add(column);
+ }
+ (
+ <COMMA> column = SimpleIdentifier()
+ {
+ partitionKey.add(column);
+ }
+ )*
+ ]
+ <RPAREN>
+ {
+ partitionColumns = new SqlNodeList(partitionKey, pos.plus(getPos()));
+ }
+ ]
+ [
+ <WITH>
+ {
+ SqlNode property;
+ List<SqlNode> proList = new ArrayList<SqlNode>();
+ pos = getPos();
+ }
+ <LPAREN>
+ [
+ property = PropertyValue()
+ {
+ proList.add(property);
+ }
+ (
+ <COMMA> property = PropertyValue()
+ {
+ proList.add(property);
+ }
+ )*
+ ]
+ <RPAREN>
+ { propertyList = new SqlNodeList(proList, pos.plus(getPos())); }
+ ]
+
+ {
+ return new SqlCreateTable(startPos.plus(getPos()),
+ tableName,
+ columnList,
+ primaryKeyList,
+ uniqueKeysList,
+ propertyList,
+ partitionColumns,
+ comment);
+ }
+}
+
+SqlDrop SqlDropTable(Span s, boolean replace) :
+{
+ SqlIdentifier tableName = null;
+ boolean ifExists = false;
+}
+{
+ <TABLE>
+
+ [<IF> <EXISTS> { ifExists = true; } ]
+
+ tableName = CompoundIdentifier()
+
+ {
+ return new SqlDropTable(s.pos(), tableName, ifExists);
+ }
+}
+
+SqlIdentifier SqlArrayType() :
+{
+ SqlParserPos pos;
+ SqlDataTypeSpec elementType;
+}
+{
+ <ARRAY> { pos = getPos(); }
+ <LT> elementType = DataType()
+ <GT>
+ {
+ return new SqlArrayType(pos, elementType);
+ }
+}
+
+SqlIdentifier SqlMapType() :
+{
+ SqlParserPos pos;
+ SqlDataTypeSpec keyType;
+ SqlDataTypeSpec valType;
+}
+{
+ <MAP> { pos = getPos(); }
+ <LT> keyType = DataType()
+ <COMMA> valType = DataType()
+ <GT>
+ {
+ return new SqlMapType(pos, keyType, valType);
+ }
+}
+
+SqlIdentifier SqlRowType() :
+{
+ SqlParserPos pos;
+ List<SqlIdentifier> fieldNames = new ArrayList<SqlIdentifier>();
+ List<SqlDataTypeSpec> fieldTypes = new ArrayList<SqlDataTypeSpec>();
+}
+{
+ <ROW> { pos = getPos(); SqlIdentifier fName; SqlDataTypeSpec fType;}
+ <LT>
+ fName = SimpleIdentifier() <COLON> fType = DataType()
+ { fieldNames.add(fName); fieldTypes.add(fType); }
+ (
+ <COMMA>
+ fName = SimpleIdentifier() <COLON> fType = DataType()
+ { fieldNames.add(fName); fieldTypes.add(fType); }
+ )*
+ <GT>
+ {
+ return new SqlRowType(pos, fieldNames, fieldTypes);
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlProperty.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlProperty.java
new file mode 100644
index 0000000..dbb58e6
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlProperty.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Properties of a DDL, consist of key value pairs.
+ */
+public class SqlProperty extends SqlCall {
+
+ /** Use this operator only if you don't have a better one. */
+ protected static final SqlOperator OPERATOR =
+ new SqlSpecialOperator("Property", SqlKind.OTHER);
+
+ private final SqlIdentifier key;
+ private final SqlNode value;
+
+ public SqlProperty(SqlIdentifier key, SqlNode value, SqlParserPos pos) {
+ super(pos);
+ this.key = requireNonNull(key, "Property key is missing");
+ this.value = requireNonNull(value, "Property value is missing");
+ }
+
+ public SqlIdentifier getKey() {
+ return key;
+ }
+
+ public SqlNode getValue() {
+ return value;
+ }
+
+ public String getKeyString() {
+ return key.toString();
+ }
+
+ public String getValueString() {
+ return ((NlsString) SqlLiteral.value(value)).getValue();
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(key, value);
+ }
+
+ @Override
+ public void unparse(
+ SqlWriter writer,
+ int leftPrec,
+ int rightPrec) {
+ key.unparse(writer, leftPrec, rightPrec);
+ writer.keyword("=");
+ value.unparse(writer, leftPrec, rightPrec);
+ }
+}
+
+// End SqlProperty.java
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/ExtendedSqlType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/ExtendedSqlType.java
new file mode 100644
index 0000000..135dc49
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/ExtendedSqlType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlWriter;
+
+/** An remark interface which should be inherited by supported sql types which are not supported
+ * by Calcite default parser.
+ *
+ * <p>Caution that the subclass must override the method
+ * {@link org.apache.calcite.sql.SqlNode#unparse(SqlWriter, int, int)}.
+ */
+public interface ExtendedSqlType {
+
+ static void unparseType(SqlDataTypeSpec type,
+ SqlWriter writer,
+ int leftPrec,
+ int rightPrec) {
+ if (type.getTypeName() instanceof ExtendedSqlType) {
+ type.getTypeName().unparse(writer, leftPrec, rightPrec);
+ } else {
+ type.unparse(writer, leftPrec, rightPrec);
+ }
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlArrayType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlArrayType.java
new file mode 100644
index 0000000..7d43d4f
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlArrayType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Parse column of ArrayType.
+ */
+public class SqlArrayType extends SqlIdentifier implements ExtendedSqlType {
+
+ private final SqlDataTypeSpec elementType;
+
+ public SqlArrayType(SqlParserPos pos, SqlDataTypeSpec elementType) {
+ super(SqlTypeName.ARRAY.getName(), pos);
+ this.elementType = elementType;
+ }
+
+ public SqlDataTypeSpec getElementType() {
+ return elementType;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("ARRAY<");
+ ExtendedSqlType.unparseType(this.elementType, writer, leftPrec, rightPrec);
+ writer.keyword(">");
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java
new file mode 100644
index 0000000..3e494a72
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+/**
+ * All supported data types in DDL. Used for Create Table DDL validation.
+ */
+public enum SqlColumnType {
+ BOOLEAN,
+ TINYINT,
+ SMALLINT,
+ INT,
+ INTEGER,
+ BIGINT,
+ REAL,
+ FLOAT,
+ DOUBLE,
+ DECIMAL,
+ DATE,
+ TIME,
+ TIMESTAMP,
+ VARCHAR,
+ VARBINARY,
+ ANY,
+ ARRAY,
+ MAP,
+ ROW,
+ UNSUPPORTED;
+
+ /** Returns the column type with the string representation. **/
+ public static SqlColumnType getType(String type) {
+ if (type == null) {
+ return UNSUPPORTED;
+ }
+ try {
+ return SqlColumnType.valueOf(type.toUpperCase());
+ } catch (IllegalArgumentException var1) {
+ return UNSUPPORTED;
+ }
+ }
+
+ /** Returns true if this type is unsupported. **/
+ public boolean isUnsupported() {
+ return this.equals(UNSUPPORTED);
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
new file mode 100644
index 0000000..47773cb
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.error.SqlParseException;
+
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.dialect.AnsiSqlDialect;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.pretty.SqlPrettyWriter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * CREATE TABLE DDL sql call.
+ */
+public class SqlCreateTable extends SqlCreate {
+
+ public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE);
+
+ private final SqlIdentifier tableName;
+
+ private final SqlNodeList columnList;
+
+ private final SqlNodeList propertyList;
+
+ private final SqlNodeList primaryKeyList;
+
+ private final List<SqlNodeList> uniqueKeysList;
+
+ private final SqlNodeList partitionKeyList;
+
+ private final SqlCharStringLiteral comment;
+
+ public SqlCreateTable(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNodeList columnList,
+ SqlNodeList primaryKeyList,
+ List<SqlNodeList> uniqueKeysList,
+ SqlNodeList propertyList,
+ SqlNodeList partitionKeyList,
+ SqlCharStringLiteral comment) {
+ super(OPERATOR, pos, false, false);
+ this.tableName = requireNonNull(tableName, "Table name is missing");
+ this.columnList = requireNonNull(columnList, "Column list should not be null");
+ this.primaryKeyList = primaryKeyList;
+ this.uniqueKeysList = uniqueKeysList;
+ this.propertyList = propertyList;
+ this.partitionKeyList = partitionKeyList;
+ this.comment = comment;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return null;
+ }
+
+ public SqlIdentifier getTableName() {
+ return tableName;
+ }
+
+ public SqlNodeList getColumnList() {
+ return columnList;
+ }
+
+ public SqlNodeList getPropertyList() {
+ return propertyList;
+ }
+
+ public SqlNodeList getPartitionKeyList() {
+ return partitionKeyList;
+ }
+
+ public SqlNodeList getPrimaryKeyList() {
+ return primaryKeyList;
+ }
+
+ public List<SqlNodeList> getUniqueKeysList() {
+ return uniqueKeysList;
+ }
+
+ public SqlCharStringLiteral getComment() {
+ return comment;
+ }
+
+ public void validate() throws SqlParseException {
+ Set<String> columnNames = new HashSet<>();
+ if (columnList != null) {
+ for (SqlNode column : columnList) {
+ String columnName = null;
+ if (column instanceof SqlTableColumn) {
+ SqlTableColumn tableColumn = (SqlTableColumn) column;
+ columnName = tableColumn.getName().getSimple();
+ String typeName = tableColumn.getType().getTypeName().getSimple();
+ if (SqlColumnType.getType(typeName).isUnsupported()) {
+ throw new SqlParseException(
+ column.getParserPosition(),
+ "Not support type [" + typeName + "], at " + column.getParserPosition());
+ }
+ } else if (column instanceof SqlBasicCall) {
+ SqlBasicCall tableColumn = (SqlBasicCall) column;
+ columnName = tableColumn.getOperands()[1].toString();
+ }
+
+ if (!columnNames.add(columnName)) {
+ throw new SqlParseException(
+ column.getParserPosition(),
+ "Duplicate column name [" + columnName + "], at " +
+ column.getParserPosition());
+ }
+ }
+ }
+
+ if (this.primaryKeyList != null) {
+ for (SqlNode primaryKeyNode : this.primaryKeyList) {
+ String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple();
+ if (!columnNames.contains(primaryKey)) {
+ throw new SqlParseException(
+ primaryKeyNode.getParserPosition(),
+ "Primary key [" + primaryKey + "] not defined in columns, at " +
+ primaryKeyNode.getParserPosition());
+ }
+ }
+ }
+
+ if (this.uniqueKeysList != null) {
+ for (SqlNodeList uniqueKeys: this.uniqueKeysList) {
+ for (SqlNode uniqueKeyNode : uniqueKeys) {
+ String uniqueKey = ((SqlIdentifier) uniqueKeyNode).getSimple();
+ if (!columnNames.contains(uniqueKey)) {
+ throw new SqlParseException(
+ uniqueKeyNode.getParserPosition(),
+ "Unique key [" + uniqueKey + "] not defined in columns, at " + uniqueKeyNode.getParserPosition());
+ }
+ }
+ }
+ }
+
+ if (this.partitionKeyList != null) {
+ for (SqlNode partitionKeyNode : this.partitionKeyList.getList()) {
+ String partitionKey = ((SqlIdentifier) partitionKeyNode).getSimple();
+ if (!columnNames.contains(partitionKey)) {
+ throw new SqlParseException(
+ partitionKeyNode.getParserPosition(),
+ "Partition column [" + partitionKey + "] not defined in columns, at "
+ + partitionKeyNode.getParserPosition());
+ }
+ }
+ }
+
+ }
+
+ public boolean containsComputedColumn() {
+ for (SqlNode column : columnList) {
+ if (column instanceof SqlBasicCall) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns the projection format of the DDL columns(including computed columns).
+ * e.g. If we got a DDL:
+ * <pre>
+ * create table tbl1(
+ * col1 int,
+ * col2 varchar,
+ * col3 as to_timestamp(col2)
+ * ) with (
+ * connector = 'csv'
+ * )
+ * </pre>
+ * we would return a query like:
+ *
+ * <p>"col1, col2, to_timestamp(col2) as col3", caution that the "computed column" operands
+ * have been reversed.
+ */
+ public String getColumnSqlString() {
+ SqlPrettyWriter writer = new SqlPrettyWriter(AnsiSqlDialect.DEFAULT);
+ writer.setAlwaysUseParentheses(true);
+ writer.setSelectListItemsOnSeparateLines(false);
+ writer.setIndentation(0);
+ writer.startList("", "");
+ for (SqlNode column : columnList) {
+ writer.sep(",");
+ if (column instanceof SqlTableColumn) {
+ SqlTableColumn tableColumn = (SqlTableColumn) column;
+ tableColumn.getName().unparse(writer, 0, 0);
+ } else {
+ column.unparse(writer, 0, 0);
+ }
+ }
+
+ return writer.toString();
+ }
+
+ @Override
+ public void unparse(
+ SqlWriter writer,
+ int leftPrec,
+ int rightPrec) {
+ writer.keyword("CREATE TABLE");
+ tableName.unparse(writer, leftPrec, rightPrec);
+ SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
+ for (SqlNode column : columnList) {
+ printIndent(writer);
+ if (column instanceof SqlBasicCall) {
+ SqlCall call = (SqlCall) column;
+ SqlCall newCall = call.getOperator().createCall(
+ SqlParserPos.ZERO,
+ call.operand(1),
+ call.operand(0));
+ newCall.unparse(writer, leftPrec, rightPrec);
+ } else {
+ column.unparse(writer, leftPrec, rightPrec);
+ }
+ }
+ if (primaryKeyList != null && primaryKeyList.size() > 0) {
+ printIndent(writer);
+ writer.keyword("PRIMARY KEY");
+ SqlWriter.Frame keyFrame = writer.startList("(", ")");
+ primaryKeyList.unparse(writer, leftPrec, rightPrec);
+ writer.endList(keyFrame);
+ }
+ if (uniqueKeysList != null && uniqueKeysList.size() > 0) {
+ printIndent(writer);
+ for (SqlNodeList uniqueKeyList : uniqueKeysList) {
+ writer.keyword("UNIQUE");
+ SqlWriter.Frame keyFrame = writer.startList("(", ")");
+ uniqueKeyList.unparse(writer, leftPrec, rightPrec);
+ writer.endList(keyFrame);
+ }
+ }
+ writer.newlineAndIndent();
+ writer.endList(frame);
+
+ if (comment != null) {
+ writer.newlineAndIndent();
+ writer.keyword("COMMENT");
+ comment.unparse(writer, leftPrec, rightPrec);
+ }
+
+ if (this.partitionKeyList != null) {
+ writer.newlineAndIndent();
+ writer.keyword("PARTITIONED BY");
+ SqlWriter.Frame withFrame = writer.startList("(", ")");
+ this.partitionKeyList.unparse(writer, leftPrec, rightPrec);
+ writer.endList(withFrame);
+ writer.newlineAndIndent();
+ }
+
+ if (propertyList != null) {
+ writer.keyword("WITH");
+ SqlWriter.Frame withFrame = writer.startList("(", ")");
+ for (SqlNode property : propertyList) {
+ printIndent(writer);
+ property.unparse(writer, leftPrec, rightPrec);
+ }
+ writer.newlineAndIndent();
+ writer.endList(withFrame);
+ }
+ }
+
+ private void printIndent(SqlWriter writer) {
+ writer.sep(",", false);
+ writer.newlineAndIndent();
+ writer.print(" ");
+ }
+
+ /**
+ * Table creation context.
+ */
+ public static class TableCreationContext {
+ public List<SqlNode> columnList = new ArrayList<>();
+ public SqlNodeList primaryKeyList;
+ public List<SqlNodeList> uniqueKeysList = new ArrayList<>();
+ }
+
+ public String[] fullTableName() {
+ return tableName.names.toArray(new String[0]);
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java
new file mode 100644
index 0000000..ee544c4
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDrop;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/**
+ * DROP TABLE DDL sql call.
+ */
+public class SqlDropTable extends SqlDrop {
+ private static final SqlOperator OPERATOR =
+ new SqlSpecialOperator("DROP TABLE", SqlKind.DROP_TABLE);
+
+ private SqlIdentifier tableName;
+ private boolean ifExists;
+
+ public SqlDropTable(SqlParserPos pos, SqlIdentifier tableName, boolean ifExists) {
+ super(OPERATOR, pos, ifExists);
+ this.tableName = tableName;
+ this.ifExists = ifExists;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(tableName);
+ }
+
+ public SqlIdentifier getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(SqlIdentifier viewName) {
+ this.tableName = viewName;
+ }
+
+ public boolean getIfExists() {
+ return this.ifExists;
+ }
+
+ public void setIfExists(boolean ifExists) {
+ this.ifExists = ifExists;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("DROP");
+ writer.keyword("TABLE");
+ if (ifExists) {
+ writer.keyword("IF EXISTS");
+ }
+ tableName.unparse(writer, leftPrec, rightPrec);
+ }
+
+ public void validate() {
+ // no-op
+ }
+
+ public String[] fullTableName() {
+ return tableName.names.toArray(new String[0]);
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlMapType.java
new file mode 100644
index 0000000..98f549a
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlMapType.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Extended Flink MapType.
+ */
+public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
+
+ private final SqlDataTypeSpec keyType;
+ private final SqlDataTypeSpec valType;
+
+ public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) {
+ super(SqlTypeName.MAP.getName(), pos);
+ this.keyType = keyType;
+ this.valType = valType;
+ }
+
+ public SqlDataTypeSpec getKeyType() {
+ return keyType;
+ }
+
+ public SqlDataTypeSpec getValType() {
+ return valType;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("MAP<");
+ ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec);
+ writer.sep(",");
+ ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec);
+ writer.keyword(">");
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlRowType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlRowType.java
new file mode 100644
index 0000000..3958053
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlRowType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Pair;
+
+import java.util.List;
+
+/**
+ * Parse column of Row type.
+ */
+public class SqlRowType extends SqlIdentifier implements ExtendedSqlType {
+
+ private final List<SqlIdentifier> fieldNames;
+ private final List<SqlDataTypeSpec> fieldTypes;
+
+ public SqlRowType(SqlParserPos pos,
+ List<SqlIdentifier> fieldNames,
+ List<SqlDataTypeSpec> fieldTypes) {
+ super(SqlTypeName.ROW.getName(), pos);
+ this.fieldNames = fieldNames;
+ this.fieldTypes = fieldTypes;
+ }
+
+ public List<SqlIdentifier> getFieldNames() {
+ return fieldNames;
+ }
+
+ public List<SqlDataTypeSpec> getFieldTypes() {
+ return fieldTypes;
+ }
+
+ public int getArity() {
+ return fieldNames.size();
+ }
+
+ public SqlIdentifier getFieldName(int i) {
+ return fieldNames.get(i);
+ }
+
+ public SqlDataTypeSpec getFieldType(int i) {
+ return fieldTypes.get(i);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("ROW");
+ SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
+ for (Pair<SqlIdentifier, SqlDataTypeSpec> p : Pair.zip(this.fieldNames, this.fieldTypes)) {
+ writer.sep(",", false);
+ p.left.unparse(writer, 0, 0);
+ writer.sep(":");
+ ExtendedSqlType.unparseType(p.right, writer, leftPrec, rightPrec);
+ }
+ writer.endList(frame);
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java
new file mode 100644
index 0000000..bcd578f
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Table column of a CREATE TABLE DDL.
+ */
+public class SqlTableColumn extends SqlCall {
+
+ private SqlIdentifier name;
+ private SqlDataTypeSpec type;
+ private SqlCharStringLiteral comment;
+
+ public SqlTableColumn(SqlIdentifier name,
+ SqlDataTypeSpec type,
+ SqlCharStringLiteral comment,
+ SqlParserPos pos) {
+ super(pos);
+ this.name = requireNonNull(name, "Column name should not be null");
+ this.type = requireNonNull(type, "Column type should not be null");
+ this.comment = comment;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return null;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return null;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ this.name.unparse(writer, leftPrec, rightPrec);
+ writer.print(" ");
+ ExtendedSqlType.unparseType(type, writer, leftPrec, rightPrec);
+ if (this.comment != null) {
+ writer.print(" COMMENT ");
+ this.comment.unparse(writer, leftPrec, rightPrec);
+ }
+ }
+
+ public SqlIdentifier getName() {
+ return name;
+ }
+
+ public void setName(SqlIdentifier name) {
+ this.name = name;
+ }
+
+ public SqlDataTypeSpec getType() {
+ return type;
+ }
+
+ public void setType(SqlDataTypeSpec type) {
+ this.type = type;
+ }
+
+ public SqlCharStringLiteral getComment() {
+ return comment;
+ }
+
+ public void setComment(SqlCharStringLiteral comment) {
+ this.comment = comment;
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/error/SqlParseException.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/error/SqlParseException.java
new file mode 100644
index 0000000..365de20
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/error/SqlParseException.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.error;
+
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * SQL parse Exception. This is a simpler version
+ * of Calcite {@link org.apache.calcite.sql.parser.SqlParseException}
+ * which is used for SqlNode validation.
+ */
+public class SqlParseException extends Exception {
+
+ private SqlParserPos errorPosition;
+
+ private String message;
+
+ public SqlParseException(SqlParserPos errorPosition, String message) {
+ this.errorPosition = errorPosition;
+ this.message = message;
+ }
+
+ public SqlParseException(SqlParserPos errorPosition, String message, Exception e) {
+ super(e);
+ this.errorPosition = errorPosition;
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public SqlParserPos getErrorPosition() {
+ return errorPosition;
+ }
+
+ public void setErrorPosition(SqlParserPos errorPosition) {
+ this.errorPosition = errorPosition;
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/SqlTimeUnit.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/SqlTimeUnit.java
new file mode 100644
index 0000000..950399a
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/SqlTimeUnit.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.utils;
+
+import org.apache.calcite.sql.SqlWriter;
+
+/** SqlTimeUnit used for Flink DDL sql. **/
+public enum SqlTimeUnit {
+ DAY("DAY", 24 * 3600 * 1000),
+ HOUR("HOUR", 3600 * 1000),
+ MINUTE("MINUTE", 60 * 1000),
+ SECOND("SECOND", 1000),
+ MILLISECOND("MILLISECOND", 1);
+
+ /** Unparsing keyword. */
+ private String keyword;
+ /** Times used to transform this time unit to millisecond. **/
+ private long timeToMillisecond;
+
+ SqlTimeUnit(String keyword, long timeToMillisecond) {
+ this.keyword = keyword;
+ this.timeToMillisecond = timeToMillisecond;
+ }
+
+ public long populateAsMillisecond(int timeInterval) {
+ return timeToMillisecond * timeInterval;
+ }
+
+ public void unparse(SqlWriter writer) {
+ writer.keyword(keyword);
+ }
+
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
new file mode 100644
index 0000000..d6d88be
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.error.SqlParseException;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParserImplFactory;
+import org.apache.calcite.sql.parser.SqlParserTest;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+/** FlinkSqlParserImpl tests. **/
+public class FlinkSqlParserImplTest extends SqlParserTest {
+
+ protected SqlParserImplFactory parserImplFactory() {
+ return FlinkSqlParserImpl.FACTORY;
+ }
+
+ @Test
+ public void testCreateTable() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " h varchar, \n" +
+ " g as 2 * (a + 1), \n" +
+ " ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+ " b varchar,\n" +
+ " proc as PROCTIME(), \n" +
+ " PRIMARY KEY (a, b)\n" +
+ ")\n" +
+ "PARTITIONED BY (a, h)\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n",
+ "CREATE TABLE `TBL1` (\n" +
+ " `A` BIGINT,\n" +
+ " `H` VARCHAR,\n" +
+ " `G` AS (2 * (`A` + 1)),\n" +
+ " `TS` AS `TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+ " `B` VARCHAR,\n" +
+ " `PROC` AS `PROCTIME`(),\n" +
+ " PRIMARY KEY (`A`, `B`)\n" +
+ ")\n" +
+ "PARTITIONED BY (`A`, `H`)\n" +
+ "WITH (\n" +
+ " `CONNECTOR` = 'kafka',\n" +
+ " `KAFKA`.`TOPIC` = 'log.test'\n" +
+ ")");
+ }
+
+ @Test
+ public void testCreateTableWithComment() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a bigint comment 'test column comment AAA.',\n" +
+ " h varchar, \n" +
+ " g as 2 * (a + 1), \n" +
+ " ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+ " b varchar,\n" +
+ " proc as PROCTIME(), \n" +
+ " PRIMARY KEY (a, b)\n" +
+ ")\n" +
+ "comment 'test table comment ABC.'\n" +
+ "PARTITIONED BY (a, h)\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n",
+ "CREATE TABLE `TBL1` (\n" +
+ " `A` BIGINT COMMENT 'test column comment AAA.',\n" +
+ " `H` VARCHAR,\n" +
+ " `G` AS (2 * (`A` + 1)),\n" +
+ " `TS` AS `TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+ " `B` VARCHAR,\n" +
+ " `PROC` AS `PROCTIME`(),\n" +
+ " PRIMARY KEY (`A`, `B`)\n" +
+ ")\n" +
+ "COMMENT 'test table comment ABC.'\n" +
+ "PARTITIONED BY (`A`, `H`)\n" +
+ "WITH (\n" +
+ " `CONNECTOR` = 'kafka',\n" +
+ " `KAFKA`.`TOPIC` = 'log.test'\n" +
+ ")");
+ }
+
+ @Test
+ public void testCreateTableWithPrimaryKeyAndUniqueKey() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a bigint comment 'test column comment AAA.',\n" +
+ " h varchar, \n" +
+ " g as 2 * (a + 1), \n" +
+ " ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+ " b varchar,\n" +
+ " proc as PROCTIME(), \n" +
+ " PRIMARY KEY (a, b), \n" +
+ " UNIQUE (h, g)\n" +
+ ")\n" +
+ "comment 'test table comment ABC.'\n" +
+ "PARTITIONED BY (a, h)\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n",
+ "CREATE TABLE `TBL1` (\n" +
+ " `A` BIGINT COMMENT 'test column comment AAA.',\n" +
+ " `H` VARCHAR,\n" +
+ " `G` AS (2 * (`A` + 1)),\n" +
+ " `TS` AS `TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+ " `B` VARCHAR,\n" +
+ " `PROC` AS `PROCTIME`(),\n" +
+ " PRIMARY KEY (`A`, `B`),\n" +
+ " UNIQUE (`H`, `G`)\n" +
+ ")\n" +
+ "COMMENT 'test table comment ABC.'\n" +
+ "PARTITIONED BY (`A`, `H`)\n" +
+ "WITH (\n" +
+ " `CONNECTOR` = 'kafka',\n" +
+ " `KAFKA`.`TOPIC` = 'log.test'\n" +
+ ")");
+ }
+
+ @Ignore // need to implement
+ @Test
+ public void testCreateTableWithoutWatermarkFieldName() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " b varchar, \n" +
+ " c as 2 * (a + 1), \n" +
+ " WATERMARK FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
+ ")\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n",
+ "CREATE TABLE `TBL1` (\n" +
+ " `A` BIGINT,\n" +
+ " `B` VARCHAR,\n" +
+ " `C` AS (2 * (`A` + 1)),\n" +
+ " WATERMARK FOR `A` AS BOUNDED WITH DELAY 1000 MILLISECOND\n" +
+ ") WITH (\n" +
+ " `CONNECTOR` = 'kafka',\n" +
+ " `KAFKA`.`TOPIC` = 'log.test'\n" +
+ ")");
+ }
+
+ @Ignore // need to implement
+ @Test
+ public void testCreateTableWithWatermarkBoundedDelay() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " b varchar, \n" +
+ " c as 2 * (a + 1), \n" +
+ " WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 DAY\n" +
+ ")\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n",
+ "CREATE TABLE `TBL1` (\n" +
+ " `A` BIGINT,\n" +
+ " `B` VARCHAR,\n" +
+ " `C` AS (2 * (`A` + 1)),\n" +
+ " WATERMARK `WK` FOR `A` AS BOUNDED WITH DELAY 1000 DAY\n" +
+ ") WITH (\n" +
+ " `CONNECTOR` = 'kafka',\n" +
+ " `KAFKA`.`TOPIC` = 'log.test'\n" +
+ ")");
+ }
+
+ @Ignore // need to implement
+ @Test
+ public void testCreateTableWithWatermarkBoundedDelay1() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " b varchar, \n" +
+ " c as 2 * (a + 1), \n" +
+ " WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 HOUR\n" +
+ ")\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n",
+ "CREATE TABLE `TBL1` (\n" +
+ " `A` BIGINT,\n" +
+ " `B` VARCHAR,\n" +
+ " `C` AS (2 * (`A` + 1)),\n" +
+ " WATERMARK `WK` FOR `A` AS BOUNDED WITH DELAY 1000 HOUR\n" +
+ ") WITH (\n" +
+ " `CONNECTOR` = 'kafka',\n" +
+ " `KAFKA`.`TOPIC` = 'log.test'\n" +
+ ")");
+ }
+
+ @Ignore // need to implement
+ @Test
+ public void testCreateTableWithWatermarkBoundedDelay2() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " b varchar, \n" +
+ " c as 2 * (a + 1), \n" +
+ " WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 MINUTE\n" +
+ ")\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n",
+ "CREATE TABLE `TBL1` (\n" +
+ " `A` BIGINT,\n" +
+ " `B` VARCHAR,\n" +
+ " `C` AS (2 * (`A` + 1)),\n" +
+ " WATERMARK `WK` FOR `A` AS BOUNDED WITH DELAY 1000 MINUTE\n" +
+ ") WITH (\n" +
+ " `CONNECTOR` = 'kafka',\n" +
+ " `KAFKA`.`TOPIC` = 'log.test'\n" +
+ ")");
+ }
+
+ @Ignore // need to implement
+ @Test
+ public void testCreateTableWithWatermarkBoundedDelay3() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " b varchar, \n" +
+ " c as 2 * (a + 1), \n" +
+ " WATERMARK wk FOR a AS BOUNDED WITH DELAY 1000 SECOND\n" +
+ ")\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n",
+ "CREATE TABLE `TBL1` (\n" +
+ " `A` BIGINT,\n" +
+ " `B` VARCHAR,\n" +
+ " `C` AS (2 * (`A` + 1)),\n" +
+ " WATERMARK `WK` FOR `A` AS BOUNDED WITH DELAY 1000 SECOND\n" +
+ ") WITH (\n" +
+ " `CONNECTOR` = 'kafka',\n" +
+ " `KAFKA`.`TOPIC` = 'log.test'\n" +
+ ")");
+ }
+
+ @Ignore // need to implement
+ @Test
+ public void testCreateTableWithNegativeWatermarkOffsetDelay() {
+ checkFails("CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " b varchar, \n" +
+ " c as 2 * (a + 1), \n" +
+ " WATERMARK wk FOR a AS BOUNDED WITH DELAY ^-^1000 SECOND\n" +
+ ")\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n",
+ "(?s).*Encountered \"-\" at line 5, column 44.\n" +
+ "Was expecting:\n" +
+ " <UNSIGNED_INTEGER_LITERAL> ...\n" +
+ ".*");
+ }
+
+ @Ignore // need to implement
+ @Test
+ public void testCreateTableWithWatermarkStrategyAscending() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " b varchar, \n" +
+ " c as 2 * (a + 1), \n" +
+ " WATERMARK wk FOR a AS ASCENDING\n" +
+ ")\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n",
+ "CREATE TABLE `TBL1` (\n" +
+ " `A` BIGINT,\n" +
+ " `B` VARCHAR,\n" +
+ " `C` AS (2 * (`A` + 1)),\n" +
+ " WATERMARK `WK` FOR `A` AS ASCENDING\n" +
+ ") WITH (\n" +
+ " `CONNECTOR` = 'kafka',\n" +
+ " `KAFKA`.`TOPIC` = 'log.test'\n" +
+ ")");
+ }
+
+ @Ignore // need to implement
+ @Test
+ public void testCreateTableWithWatermarkStrategyFromSource() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " b varchar, \n" +
+ " c as 2 * (a + 1), \n" +
+ " WATERMARK wk FOR a AS FROM_SOURCE\n" +
+ ")\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n",
+ "CREATE TABLE `TBL1` (\n" +
+ " `A` BIGINT,\n" +
+ " `B` VARCHAR,\n" +
+ " `C` AS (2 * (`A` + 1)),\n" +
+ " WATERMARK `WK` FOR `A` AS FROM_SOURCE\n" +
+ ") WITH (\n" +
+ " `CONNECTOR` = 'kafka',\n" +
+ " `KAFKA`.`TOPIC` = 'log.test'\n" +
+ ")");
+ }
+
+ @Test
+ public void testCreateTableWithComplexType() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a ARRAY<bigint>, \n" +
+ " b MAP<int, varchar>,\n" +
+ " c ROW<cc0:int, cc1: float, cc2: varchar>,\n" +
+ " PRIMARY KEY (a, b) \n" +
+ ") with (\n" +
+ " x = 'y', \n" +
+ " asd = 'data'\n" +
+ ")\n", "CREATE TABLE `TBL1` (\n" +
+ " `A` ARRAY< BIGINT >,\n" +
+ " `B` MAP< INTEGER, VARCHAR >,\n" +
+ " `C` ROW< `CC0` : INTEGER, `CC1` : FLOAT, `CC2` : VARCHAR >,\n" +
+ " PRIMARY KEY (`A`, `B`)\n" +
+ ") WITH (\n" +
+ " `X` = 'y',\n" +
+ " `ASD` = 'data'\n" +
+ ")");
+ }
+
+ @Test
+ public void testCreateTableWithDecimalType() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a decimal, \n" +
+ " b decimal(10, 0),\n" +
+ " c decimal(38, 38),\n" +
+ " PRIMARY KEY (a, b) \n" +
+ ") with (\n" +
+ " x = 'y', \n" +
+ " asd = 'data'\n" +
+ ")\n", "CREATE TABLE `TBL1` (\n" +
+ " `A` DECIMAL,\n" +
+ " `B` DECIMAL(10, 0),\n" +
+ " `C` DECIMAL(38, 38),\n" +
+ " PRIMARY KEY (`A`, `B`)\n" +
+ ") WITH (\n" +
+ " `X` = 'y',\n" +
+ " `ASD` = 'data'\n" +
+ ")");
+ }
+
+ @Test
+ public void testCreateTableWithNestedComplexType() {
+ check("CREATE TABLE tbl1 (\n" +
+ " a ARRAY<ARRAY<bigint>>, \n" +
+ " b MAP<MAP<int, varchar>, ARRAY<varchar>>,\n" +
+ " c ROW<cc0:ARRAY<int>, cc1: float, cc2: varchar>,\n" +
+ " PRIMARY KEY (a, b) \n" +
+ ") with (\n" +
+ " x = 'y', \n" +
+ " asd = 'data'\n" +
+ ")\n", "CREATE TABLE `TBL1` (\n" +
+ " `A` ARRAY< ARRAY< BIGINT > >,\n" +
+ " `B` MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > >,\n" +
+ " `C` ROW< `CC0` : ARRAY< INTEGER >, `CC1` : FLOAT, `CC2` : VARCHAR >,\n" +
+ " PRIMARY KEY (`A`, `B`)\n" +
+ ") WITH (\n" +
+ " `X` = 'y',\n" +
+ " `ASD` = 'data'\n" +
+ ")");
+ }
+
+ @Test
+ public void testInvalidComputedColumn() {
+ checkFails("CREATE TABLE sls_stream (\n" +
+ " a bigint, \n" +
+ " b varchar,\n" +
+ " ^toTimestamp^(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+ " PRIMARY KEY (a, b) \n" +
+ ") with (\n" +
+ " x = 'y', \n" +
+ " asd = 'data'\n" +
+ ")\n", "(?s).*Encountered \"toTimestamp \\(\" at line 4, column 3.\n" +
+ "Was expecting one of:\n" +
+ " <IDENTIFIER> \"CHARACTER\" ...\n" +
+ " <IDENTIFIER> \"CHAR\" ...\n" +
+ ".*");
+ }
+
+ @Test
+ public void testColumnSqlString() {
+ String sql = "CREATE TABLE sls_stream (\n" +
+ " a bigint, \n" +
+ " f as a + 1, \n" +
+ " b varchar,\n" +
+ " ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+ " proc as PROCTIME(),\n" +
+ " c int,\n" +
+ " PRIMARY KEY (a, b) \n" +
+ ") with (\n" +
+ " x = 'y', \n" +
+ " asd = 'data'\n" +
+ ")\n";
+ String expected = "`A`, (`A` + 1) AS `F`, `B`, "
+ + "`TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss') AS `TS`, "
+ + "`PROCTIME`() AS `PROC`, `C`";
+ sql(sql).node(new ValidationMatcher()
+ .expectColumnSql(expected));
+ }
+
+ @Test
+ public void testCreateInvalidPartitionedTable() {
+ String sql = "create table sls_stream1(\n" +
+ " a bigint,\n" +
+ " b VARCHAR,\n" +
+ " PRIMARY KEY(a, b)\n" +
+ ") PARTITIONED BY (\n" +
+ " c,\n" +
+ " d\n" +
+ ") with ( x = 'y', asd = 'dada')";
+ sql(sql).node(new ValidationMatcher()
+ .fails("Partition column [C] not defined in columns, at line 6, column 3"));
+
+ }
+
+ @Test
+ public void testDropTable() {
+ String sql = "DROP table catalog1.db1.tbl1";
+ check(sql, "DROP TABLE `CATALOG1`.`DB1`.`TBL1`");
+ }
+
+ @Test
+ public void testDropIfExists() {
+ String sql = "DROP table IF EXISTS catalog1.db1.tbl1";
+ check(sql, "DROP TABLE IF EXISTS `CATALOG1`.`DB1`.`TBL1`");
+ }
+
+ /** Matcher that invokes the #validate() of the produced SqlNode. **/
+ private static class ValidationMatcher extends BaseMatcher<SqlNode> {
+ private String expectedColumnSql;
+ private String failMsg;
+
+ public ValidationMatcher expectColumnSql(String s) {
+ this.expectedColumnSql = s;
+ return this;
+ }
+
+ public ValidationMatcher fails(String failMsg) {
+ this.failMsg = failMsg;
+ return this;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("test");
+ }
+
+ @Override
+ public boolean matches(Object item) {
+ if (item instanceof SqlCreateTable) {
+ SqlCreateTable createTable = (SqlCreateTable) item;
+ try {
+ createTable.validate();
+ } catch (SqlParseException e) {
+ assertEquals(failMsg, e.getMessage());
+ }
+ if (expectedColumnSql != null) {
+ assertEquals(expectedColumnSql, createTable.getColumnSqlString());
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java
new file mode 100644
index 0000000..ce3ac2d
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlUnParserTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+/**
+ * Extension to {@link FlinkSqlParserImplTest} that ensures that every expression can
+ * un-parse successfully.
+ */
+public class FlinkSqlUnParserTest extends FlinkSqlParserImplTest {
+ //~ Constructors -----------------------------------------------------------
+
+ public FlinkSqlUnParserTest() {
+ }
+
+ //~ Methods ----------------------------------------------------------------
+
+ @Override
+ protected boolean isUnparserTest() {
+ return true;
+ }
+
+ @Override
+ protected Tester getTester() {
+ return new UnparsingTesterImpl();
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml
index f960b47..7715207 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -102,6 +102,12 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-parser</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatement.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatement.java
new file mode 100644
index 0000000..7ac3106
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatement.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
+
+/**
+ * Mix-in tool class for {@code SqlNode} that allows DDL commands to be
+ * executed directly.
+ *
+ * <p>For every kind of {@link SqlNode}, there needs to have a corresponding
+ * method #execute(type) method, the 'type' argument should be the subclass
+ * type for the supported {@link SqlNode}.
+ */
+public class SqlExecutableStatement implements ReflectiveVisitor {
+ private TableEnvironment tableEnv;
+
+ private final ReflectUtil.MethodDispatcher<Void> dispatcher =
+ ReflectUtil.createMethodDispatcher(Void.class,
+ this,
+ "execute",
+ SqlNode.class);
+
+ //~ Constructors -----------------------------------------------------------
+
+ private SqlExecutableStatement(TableEnvironment tableEnvironment) {
+ this.tableEnv = tableEnvironment;
+ }
+
+ /**
+ * This is the main entrance of executing all kinds of DDL/DML {@code SqlNode}s, different
+ * SqlNode will have it's implementation in the #execute(type) method whose 'type' argument
+ * is subclass of {@code SqlNode}.
+ *
+ * <p>Caution that the {@link #execute(SqlNode)} should never expect to be invoked.
+ *
+ * @param tableEnvironment TableEnvironment to interact with
+ * @param sqlNode SqlNode to execute on
+ */
+ public static void executeSqlNode(TableEnvironment tableEnvironment, SqlNode sqlNode) {
+ SqlExecutableStatement statement = new SqlExecutableStatement(tableEnvironment);
+ statement.dispatcher.invoke(sqlNode);
+ }
+
+ /**
+ * Execute the {@link SqlCreateTable} node.
+ */
+ public void execute(SqlCreateTable sqlCreateTable) {
+ // need to implement.
+ }
+
+ /** Fallback method to throw exception. */
+ public void execute(SqlNode node) {
+ throw new TableException("Should not invoke to node type "
+ + node.getClass().getSimpleName());
+ }
+}
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index 5213ea5..57918da 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -43,6 +43,7 @@ under the License.
<module>flink-table-runtime-blink</module>
<module>flink-table-uber</module>
<module>flink-sql-client</module>
+ <module>flink-sql-parser</module>
</modules>
<properties>