You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/09/02 11:44:03 UTC

[flink] branch release-1.9 updated: [FLINK-13568][sql-parser] Fix DDL CREATE TABLE statement doesn't allow STRING data type

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 1f0c670  [FLINK-13568][sql-parser] Fix DDL CREATE TABLE statement doesn't allow STRING data type
1f0c670 is described below

commit 1f0c67087f19b10d1218af30282414f8b032f2a8
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Tue Aug 6 20:35:15 2019 +0800

    [FLINK-13568][sql-parser] Fix DDL CREATE TABLE statement doesn't allow STRING data type
    
    This closes #9354
---
 .../apache/flink/sql/parser/ddl/SqlColumnType.java |  62 -----
 .../flink/sql/parser/ddl/SqlCreateTable.java       |  12 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  13 ++
 .../table/sqlexec/SqlToOperationConverterTest.java | 187 +++++++++++++++
 .../table/sqlexec/SqlToOperationConverterTest.java | 255 +++++++++++++++++++++
 5 files changed, 458 insertions(+), 71 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java
deleted file mode 100644
index 3e494a72..0000000
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.sql.parser.ddl;
-
-/**
- * All supported data types in DDL. Used for Create Table DDL validation.
- */
-public enum SqlColumnType {
-	BOOLEAN,
-	TINYINT,
-	SMALLINT,
-	INT,
-	INTEGER,
-	BIGINT,
-	REAL,
-	FLOAT,
-	DOUBLE,
-	DECIMAL,
-	DATE,
-	TIME,
-	TIMESTAMP,
-	VARCHAR,
-	VARBINARY,
-	ANY,
-	ARRAY,
-	MAP,
-	ROW,
-	UNSUPPORTED;
-
-	/** Returns the column type with the string representation. **/
-	public static SqlColumnType getType(String type) {
-		if (type == null) {
-			return UNSUPPORTED;
-		}
-		try {
-			return SqlColumnType.valueOf(type.toUpperCase());
-		} catch (IllegalArgumentException var1) {
-			return UNSUPPORTED;
-		}
-	}
-
-	/** Returns true if this type is unsupported. **/
-	public boolean isUnsupported() {
-		return this.equals(UNSUPPORTED);
-	}
-}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 980aec4..183e2d8 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -135,12 +135,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 				if (column instanceof SqlTableColumn) {
 					SqlTableColumn tableColumn = (SqlTableColumn) column;
 					columnName = tableColumn.getName().getSimple();
-					String typeName = tableColumn.getType().getTypeName().getSimple();
-					if (SqlColumnType.getType(typeName).isUnsupported()) {
-						throw new SqlParseException(
-							column.getParserPosition(),
-							"Not support type [" + typeName + "], at " + column.getParserPosition());
-					}
 				} else if (column instanceof SqlBasicCall) {
 					SqlBasicCall tableColumn = (SqlBasicCall) column;
 					columnName = tableColumn.getOperands()[1].toString();
@@ -241,9 +235,9 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 
 	@Override
 	public void unparse(
-		SqlWriter writer,
-		int leftPrec,
-		int rightPrec) {
+			SqlWriter writer,
+			int leftPrec,
+			int rightPrec) {
 		writer.keyword("CREATE TABLE");
 		tableName.unparse(writer, leftPrec, rightPrec);
 		SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 97bb093..7a494b3 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -408,6 +408,19 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 	}
 
 	@Test
+	public void testCreateTableWithUserDefinedType() {
+		final String sql = "create table t(\n" +
+			"  a catalog1.db1.MyType1,\n" +
+			"  b db2.MyType2\n" +
+			") with (\n" +
+			"  'k1' = 'v1',\n" +
+			"  'k2' = 'v2'\n" +
+			")";
+		final String errMsg = "UDT in DDL is not supported yet.";
+		checkFails(sql, errMsg);
+	}
+
+	@Test
 	public void testInvalidComputedColumn() {
 		checkFails("CREATE TABLE sls_stream (\n" +
 			"  a bigint, \n" +
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 6fa3c31..1acdd88 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -48,9 +48,12 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
@@ -190,6 +193,156 @@ public class SqlToOperationConverterTest {
 		assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions());
 	}
 
+	@Test // TODO: tweak the tests when FLINK-13604 is fixed.
+	public void testCreateTableWithFullDataTypes() {
+		final List<TestItem> testItems = Arrays.asList(
+			createTestItem("CHAR", DataTypes.CHAR(1)),
+			createTestItem("CHAR NOT NULL", DataTypes.CHAR(1).notNull()),
+			createTestItem("CHAR NULL", DataTypes.CHAR(1)),
+			createTestItem("CHAR(33)", DataTypes.CHAR(33)),
+			createTestItem("VARCHAR", DataTypes.STRING()),
+			createTestItem("VARCHAR(33)", DataTypes.VARCHAR(33)),
+			createTestItem("STRING", DataTypes.STRING()),
+			createTestItem("BOOLEAN", DataTypes.BOOLEAN()),
+			createTestItem("BINARY", DataTypes.BINARY(1)),
+			createTestItem("BINARY(33)", DataTypes.BINARY(33)),
+			createTestItem("VARBINARY", DataTypes.BYTES()),
+			createTestItem("VARBINARY(33)", DataTypes.VARBINARY(33)),
+			createTestItem("BYTES", DataTypes.BYTES()),
+			createTestItem("DECIMAL", DataTypes.DECIMAL(10, 0)),
+			createTestItem("DEC", DataTypes.DECIMAL(10, 0)),
+			createTestItem("NUMERIC", DataTypes.DECIMAL(10, 0)),
+			createTestItem("DECIMAL(10)", DataTypes.DECIMAL(10, 0)),
+			createTestItem("DEC(10)", DataTypes.DECIMAL(10, 0)),
+			createTestItem("NUMERIC(10)", DataTypes.DECIMAL(10, 0)),
+			createTestItem("DECIMAL(10, 3)", DataTypes.DECIMAL(10, 3)),
+			createTestItem("DEC(10, 3)", DataTypes.DECIMAL(10, 3)),
+			createTestItem("NUMERIC(10, 3)", DataTypes.DECIMAL(10, 3)),
+			createTestItem("TINYINT", DataTypes.TINYINT()),
+			createTestItem("SMALLINT", DataTypes.SMALLINT()),
+			createTestItem("INTEGER", DataTypes.INT()),
+			createTestItem("INT", DataTypes.INT()),
+			createTestItem("BIGINT", DataTypes.BIGINT()),
+			createTestItem("FLOAT", DataTypes.FLOAT()),
+			createTestItem("DOUBLE", DataTypes.DOUBLE()),
+			createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()),
+			createTestItem("DATE", DataTypes.DATE()),
+			createTestItem("TIME", DataTypes.TIME()),
+			createTestItem("TIME WITHOUT TIME ZONE", DataTypes.TIME()),
+			// Expect to be TIME(3).
+			createTestItem("TIME(3)", DataTypes.TIME()),
+			// Expect to be TIME(3).
+			createTestItem("TIME(3) WITHOUT TIME ZONE", DataTypes.TIME()),
+			createTestItem("TIMESTAMP", DataTypes.TIMESTAMP(3)),
+			createTestItem("TIMESTAMP WITHOUT TIME ZONE", DataTypes.TIMESTAMP(3)),
+			createTestItem("TIMESTAMP(3)", DataTypes.TIMESTAMP(3)),
+			createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE", DataTypes.TIMESTAMP(3)),
+			createTestItem("TIMESTAMP WITH LOCAL TIME ZONE",
+				DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+			createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE",
+				DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+			createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>",
+				DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))),
+			createTestItem("ARRAY<INT NOT NULL>",
+				DataTypes.ARRAY(DataTypes.INT().notNull())),
+			createTestItem("INT ARRAY", DataTypes.ARRAY(DataTypes.INT())),
+			createTestItem("INT NOT NULL ARRAY",
+				DataTypes.ARRAY(DataTypes.INT().notNull())),
+			createTestItem("INT ARRAY NOT NULL",
+				DataTypes.ARRAY(DataTypes.INT()).notNull()),
+			createTestItem("MULTISET<INT NOT NULL>",
+				DataTypes.MULTISET(DataTypes.INT().notNull())),
+			createTestItem("INT MULTISET",
+				DataTypes.MULTISET(DataTypes.INT())),
+			createTestItem("INT NOT NULL MULTISET",
+				DataTypes.MULTISET(DataTypes.INT().notNull())),
+			createTestItem("INT MULTISET NOT NULL",
+				DataTypes.MULTISET(DataTypes.INT()).notNull()),
+			createTestItem("MAP<BIGINT, BOOLEAN>",
+				DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BOOLEAN())),
+			// Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
+			createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>",
+				DataTypes.ROW(
+					DataTypes.FIELD("f0", DataTypes.INT()),
+					DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
+			// Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
+			createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)",
+				DataTypes.ROW(
+					DataTypes.FIELD("f0", DataTypes.INT()),
+					DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
+			createTestItem("ROW<`f0` INT>",
+				DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
+			createTestItem("ROW(`f0` INT)",
+				DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
+			createTestItem("ROW<>", DataTypes.ROW()),
+			createTestItem("ROW()", DataTypes.ROW()),
+			// Expect to be ROW<`f0` INT NOT NULL '...', `f1` BOOLEAN '...'>.
+			createTestItem("ROW<f0 INT NOT NULL 'This is a comment.',"
+				+ " f1 BOOLEAN 'This as well.'>",
+				DataTypes.ROW(
+					DataTypes.FIELD("f0", DataTypes.INT()),
+					DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
+			createTestItem("ARRAY<ROW<f0 INT, f1 BOOLEAN>>",
+				DataTypes.ARRAY(
+					DataTypes.ROW(
+						DataTypes.FIELD("f0", DataTypes.INT()),
+						DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+			createTestItem("ROW<f0 INT, f1 BOOLEAN> MULTISET",
+				DataTypes.MULTISET(
+					DataTypes.ROW(
+						DataTypes.FIELD("f0", DataTypes.INT()),
+						DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+			createTestItem("MULTISET<ROW<f0 INT, f1 BOOLEAN>>",
+				DataTypes.MULTISET(
+					DataTypes.ROW(
+						DataTypes.FIELD("f0", DataTypes.INT()),
+						DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+			createTestItem("ROW<f0 Row<f00 INT, f01 BOOLEAN>, "
+					+ "f1 INT ARRAY, "
+					+ "f2 BOOLEAN MULTISET>",
+				DataTypes.ROW(DataTypes.FIELD("f0",
+					DataTypes.ROW(
+						DataTypes.FIELD("f00", DataTypes.INT()),
+						DataTypes.FIELD("f01", DataTypes.BOOLEAN()))),
+					DataTypes.FIELD("f1", DataTypes.ARRAY(DataTypes.INT())),
+					DataTypes.FIELD("f2", DataTypes.MULTISET(DataTypes.BOOLEAN()))))
+		);
+		StringBuilder buffer = new StringBuilder("create table t1(\n");
+		for (int i = 0; i < testItems.size(); i++) {
+			buffer.append("f")
+				.append(i)
+				.append(" ")
+				.append(testItems.get(i).testExpr);
+			if (i == testItems.size() - 1) {
+				buffer.append(")");
+			} else {
+				buffer.append(",\n");
+			}
+		}
+		final String sql = buffer.toString();
+		final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+		SqlNode node = planner.parse(sql);
+		assert node instanceof SqlCreateTable;
+		Operation operation = SqlToOperationConverter.convert(planner, node);
+		TableSchema schema = ((CreateTableOperation) operation).getCatalogTable().getSchema();
+		Object[] expectedDataTypes = testItems.stream().map(item -> item.expectedType).toArray();
+		assertArrayEquals(expectedDataTypes, schema.getFieldDataTypes());
+	}
+
+	//~ Tool Methods ----------------------------------------------------------
+
+	private static TestItem createTestItem(Object... args) {
+		assert args.length == 2;
+		final String testExpr = (String) args[0];
+		TestItem testItem = TestItem.fromTestExpr(testExpr);
+		if (args[1] instanceof String) {
+			testItem.withExpectedError((String) args[1]);
+		} else {
+			testItem.withExpectedType(args[1]);
+		}
+		return testItem;
+	}
+
 	private Operation parse(String sql, FlinkPlannerImpl planner) {
 		SqlNode node = planner.parse(sql);
 		return SqlToOperationConverter.convert(planner, node);
@@ -200,4 +353,38 @@ public class SqlToOperationConverterTest {
 		return plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(),
 			catalogManager.getCurrentDatabase());
 	}
+
+	//~ Inner Classes ----------------------------------------------------------
+
+	private static class TestItem {
+		private final String testExpr;
+		@Nullable
+		private Object expectedType;
+		@Nullable
+		private String expectedError;
+
+		private TestItem(String testExpr) {
+			this.testExpr = testExpr;
+		}
+
+		static TestItem fromTestExpr(String testExpr) {
+			return new TestItem(testExpr);
+		}
+
+		TestItem withExpectedType(Object expectedType) {
+			this.expectedType = expectedType;
+			return this;
+		}
+
+		TestItem withExpectedError(String expectedError) {
+			this.expectedError = expectedError;
+			return this;
+		}
+
+		@Override
+		public String toString() {
+			return this.testExpr;
+		}
+	}
+
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 7225cbc..0424109 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -23,7 +23,9 @@ import org.apache.flink.sql.parser.dml.RichSqlInsert;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
 import org.apache.flink.table.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
@@ -43,14 +45,20 @@ import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.planner.PlanningConfigurationBuilder;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
 
 import org.apache.calcite.sql.SqlNode;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
@@ -74,6 +82,9 @@ public class SqlToOperationConverterTest {
 			new ExpressionBridge<>(functionCatalog,
 				PlannerExpressionConverter.INSTANCE()));
 
+	@Rule
+	public ExpectedException expectedEx = ExpectedException.none();
+
 	@Before
 	public void before() throws TableAlreadyExistException, DatabaseNotExistException {
 		final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
@@ -195,9 +206,253 @@ public class SqlToOperationConverterTest {
 		assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions());
 	}
 
+	@Test // TODO: tweak the tests when FLINK-13604 is fixed.
+	public void testCreateTableWithFullDataTypes() {
+		final List<TestItem> testItems = Arrays.asList(
+			// Expect to be DataTypes.CHAR(1).
+			createTestItem("CHAR", DataTypes.STRING()),
+			// Expect to be DataTypes.CHAR(1).notNull().
+			createTestItem("CHAR NOT NULL", DataTypes.STRING()),
+			// Expect to be DataTypes.CHAR(1).
+			createTestItem("CHAR NULL", DataTypes.STRING()),
+			// Expect to be DataTypes.CHAR(33).
+			createTestItem("CHAR(33)", DataTypes.STRING()),
+			createTestItem("VARCHAR", DataTypes.STRING()),
+			// Expect to be DataTypes.VARCHAR(33).
+			createTestItem("VARCHAR(33)", DataTypes.STRING()),
+			createTestItem("STRING", DataTypes.STRING()),
+			createTestItem("BOOLEAN", DataTypes.BOOLEAN()),
+			// Expect to be DECIMAL(10, 0).
+			createTestItem("DECIMAL",
+				TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+			// Expect to be DECIMAL(10, 0).
+			createTestItem("DEC",
+				TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+			// Expect to be DECIMAL(10, 0).
+			createTestItem("NUMERIC",
+				TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+			// Expect to be DECIMAL(10, 0).
+			createTestItem("DECIMAL(10)",
+				TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+			// Expect to be DECIMAL(10, 0).
+			createTestItem("DEC(10)",
+				TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+			// Expect to be DECIMAL(10, 0).
+			createTestItem("NUMERIC(10)",
+				TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+			// Expect to be DECIMAL(10, 3).
+			createTestItem("DECIMAL(10, 3)",
+				TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+			// Expect to be DECIMAL(10, 3).
+			createTestItem("DEC(10, 3)",
+				TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+			// Expect to be DECIMAL(10, 3).
+			createTestItem("NUMERIC(10, 3)",
+				TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+			createTestItem("TINYINT", DataTypes.TINYINT()),
+			createTestItem("SMALLINT", DataTypes.SMALLINT()),
+			createTestItem("INTEGER", DataTypes.INT()),
+			createTestItem("INT", DataTypes.INT()),
+			createTestItem("BIGINT", DataTypes.BIGINT()),
+			createTestItem("FLOAT", DataTypes.FLOAT()),
+			createTestItem("DOUBLE", DataTypes.DOUBLE()),
+			createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()),
+			createTestItem("DATE",
+				TypeConversions.fromLegacyInfoToDataType(Types.SQL_DATE())),
+			createTestItem("TIME",
+				TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())),
+			createTestItem("TIME WITHOUT TIME ZONE",
+				TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())),
+			// Expect to be Time(3).
+			createTestItem("TIME(3)",
+				TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())),
+			// Expect to be Time(3).
+			createTestItem("TIME(3) WITHOUT TIME ZONE",
+				TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())),
+			createTestItem("TIMESTAMP",
+				TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())),
+			createTestItem("TIMESTAMP WITHOUT TIME ZONE",
+				TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())),
+			// Expect to be timestamp(3).
+			createTestItem("TIMESTAMP(3)",
+				TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())),
+			// Expect to be timestamp(3).
+			createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE",
+				TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())),
+			// Expect to be ARRAY<INT NOT NULL>.
+			createTestItem("ARRAY<INT NOT NULL>",
+				DataTypes.ARRAY(DataTypes.INT())),
+			createTestItem("INT ARRAY", DataTypes.ARRAY(DataTypes.INT())),
+			// Expect to be ARRAY<INT NOT NULL>.
+			createTestItem("INT NOT NULL ARRAY",
+				DataTypes.ARRAY(DataTypes.INT())),
+			// Expect to be ARRAY<INT> NOT NULL.
+			createTestItem("INT ARRAY NOT NULL",
+				DataTypes.ARRAY(DataTypes.INT())),
+			// Expect to be MULTISET<INT NOT NULL>.
+			createTestItem("MULTISET<INT NOT NULL>",
+				DataTypes.MULTISET(DataTypes.INT())),
+			createTestItem("INT MULTISET", DataTypes.MULTISET(DataTypes.INT())),
+			// Expect to be MULTISET<INT NOT NULL>.
+			createTestItem("INT NOT NULL MULTISET",
+				DataTypes.MULTISET(DataTypes.INT())),
+			// Expect to be MULTISET<INT> NOT NULL.
+			createTestItem("INT MULTISET NOT NULL",
+				DataTypes.MULTISET(DataTypes.INT())),
+			createTestItem("MAP<BIGINT, BOOLEAN>",
+				DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BOOLEAN())),
+			// Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
+			createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>",
+				DataTypes.ROW(
+					DataTypes.FIELD("f0", DataTypes.INT()),
+					DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
+			// Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
+			createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)",
+				DataTypes.ROW(
+					DataTypes.FIELD("f0", DataTypes.INT()),
+					DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
+			createTestItem("ROW<`f0` INT>",
+				DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
+			createTestItem("ROW(`f0` INT)",
+				DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
+			createTestItem("ROW<>", DataTypes.ROW()),
+			createTestItem("ROW()", DataTypes.ROW()),
+			// Expect to be ROW<`f0` INT NOT NULL '...', `f1` BOOLEAN '...'>.
+			createTestItem("ROW<f0 INT NOT NULL 'This is a comment.', "
+					+ "f1 BOOLEAN 'This as well.'>",
+				DataTypes.ROW(
+					DataTypes.FIELD("f0", DataTypes.INT()),
+					DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
+			createTestItem("ROW<f0 INT, f1 BOOLEAN> ARRAY",
+				DataTypes.ARRAY(
+					DataTypes.ROW(
+						DataTypes.FIELD("f0", DataTypes.INT()),
+						DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+			createTestItem("ARRAY<ROW<f0 INT, f1 BOOLEAN>>",
+				DataTypes.ARRAY(
+					DataTypes.ROW(
+						DataTypes.FIELD("f0", DataTypes.INT()),
+						DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+			createTestItem("ROW<f0 INT, f1 BOOLEAN> MULTISET",
+				DataTypes.MULTISET(
+					DataTypes.ROW(
+						DataTypes.FIELD("f0", DataTypes.INT()),
+						DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+			createTestItem("MULTISET<ROW<f0 INT, f1 BOOLEAN>>",
+				DataTypes.MULTISET(
+					DataTypes.ROW(
+						DataTypes.FIELD("f0", DataTypes.INT()),
+						DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+			createTestItem("ROW<f0 Row<f00 INT, f01 BOOLEAN>, "
+					+ "f1 INT ARRAY, "
+					+ "f2 BOOLEAN MULTISET>",
+				DataTypes.ROW(DataTypes.FIELD("f0",
+					DataTypes.ROW(
+						DataTypes.FIELD("f00", DataTypes.INT()),
+						DataTypes.FIELD("f01", DataTypes.BOOLEAN()))),
+					DataTypes.FIELD("f1", DataTypes.ARRAY(DataTypes.INT())),
+					DataTypes.FIELD("f2", DataTypes.MULTISET(DataTypes.BOOLEAN()))))
+		);
+		StringBuilder buffer = new StringBuilder("create table t1(\n");
+		for (int i = 0; i < testItems.size(); i++) {
+			buffer.append("f")
+				.append(i)
+				.append(" ")
+				.append(testItems.get(i).testExpr);
+			if (i == testItems.size() - 1) {
+				buffer.append(")");
+			} else {
+				buffer.append(",\n");
+			}
+		}
+		final String sql = buffer.toString();
+		final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+		SqlNode node = planner.parse(sql);
+		assert node instanceof SqlCreateTable;
+		Operation operation = SqlToOperationConverter.convert(planner, node);
+		TableSchema schema = ((CreateTableOperation) operation).getCatalogTable().getSchema();
+		Object[] expectedDataTypes = testItems.stream().map(item -> item.expectedType).toArray();
+		assertArrayEquals(expectedDataTypes, schema.getFieldDataTypes());
+	}
+
+	@Test
+	public void testCreateTableWithUnSupportedDataTypes() {
+		final List<TestItem> testItems = Arrays.asList(
+			createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>",
+				"Type is not supported: TIMESTAMP_WITH_LOCAL_TIME_ZONE"),
+			createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE",
+				"Type is not supported: TIMESTAMP_WITH_LOCAL_TIME_ZONE"),
+			createTestItem("TIMESTAMP WITH LOCAL TIME ZONE",
+				"Type is not supported: TIMESTAMP_WITH_LOCAL_TIME_ZONE"),
+			createTestItem("BYTES", "Type is not supported: VARBINARY"),
+			createTestItem("VARBINARY(33)", "Type is not supported: VARBINARY"),
+			createTestItem("VARBINARY", "Type is not supported: VARBINARY"),
+			createTestItem("BINARY(33)", "Type is not supported: BINARY"),
+			createTestItem("BINARY", "Type is not supported: BINARY")
+		);
+		final String sqlTemplate = "create table t1(\n" +
+			"  f0 %s)";
+		final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+		for (TestItem item : testItems) {
+			String sql = String.format(sqlTemplate, item.testExpr);
+			SqlNode node = planner.parse(sql);
+			assert node instanceof SqlCreateTable;
+			expectedEx.expect(TableException.class);
+			expectedEx.expectMessage(item.expectedError);
+			SqlToOperationConverter.convert(planner, node);
+		}
+	}
+
+	//~ Tool Methods ----------------------------------------------------------
+
+	private static TestItem createTestItem(Object... args) {
+		assert args.length == 2;
+		final String testExpr = (String) args[0];
+		TestItem testItem = TestItem.fromTestExpr(testExpr);
+		if (args[1] instanceof String) {
+			testItem.withExpectedError((String) args[1]);
+		} else {
+			testItem.withExpectedType(args[1]);
+		}
+		return testItem;
+	}
+
 	private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
 		tableConfig.setSqlDialect(sqlDialect);
 		return planningConfigurationBuilder.createFlinkPlanner(catalogManager.getCurrentCatalog(),
 			catalogManager.getCurrentDatabase());
 	}
+
+	//~ Inner Classes ----------------------------------------------------------
+
+	private static class TestItem {
+		private final String testExpr;
+		@Nullable
+		private Object expectedType;
+		@Nullable
+		private String expectedError;
+
+		private TestItem(String testExpr) {
+			this.testExpr = testExpr;
+		}
+
+		static TestItem fromTestExpr(String testExpr) {
+			return new TestItem(testExpr);
+		}
+
+		TestItem withExpectedType(Object expectedType) {
+			this.expectedType = expectedType;
+			return this;
+		}
+
+		TestItem withExpectedError(String expectedError) {
+			this.expectedError = expectedError;
+			return this;
+		}
+
+		@Override
+		public String toString() {
+			return this.testExpr;
+		}
+	}
 }