You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/09/02 11:44:03 UTC
[flink] branch release-1.9 updated: [FLINK-13568][sql-parser] Fix
DDL CREATE TABLE statement doesn't allow STRING data type
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 1f0c670 [FLINK-13568][sql-parser] Fix DDL CREATE TABLE statement doesn't allow STRING data type
1f0c670 is described below
commit 1f0c67087f19b10d1218af30282414f8b032f2a8
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Tue Aug 6 20:35:15 2019 +0800
[FLINK-13568][sql-parser] Fix DDL CREATE TABLE statement doesn't allow STRING data type
This closes #9354
---
.../apache/flink/sql/parser/ddl/SqlColumnType.java | 62 -----
.../flink/sql/parser/ddl/SqlCreateTable.java | 12 +-
.../flink/sql/parser/FlinkSqlParserImplTest.java | 13 ++
.../table/sqlexec/SqlToOperationConverterTest.java | 187 +++++++++++++++
.../table/sqlexec/SqlToOperationConverterTest.java | 255 +++++++++++++++++++++
5 files changed, 458 insertions(+), 71 deletions(-)
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java
deleted file mode 100644
index 3e494a72..0000000
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlColumnType.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.sql.parser.ddl;
-
-/**
- * All supported data types in DDL. Used for Create Table DDL validation.
- */
-public enum SqlColumnType {
- BOOLEAN,
- TINYINT,
- SMALLINT,
- INT,
- INTEGER,
- BIGINT,
- REAL,
- FLOAT,
- DOUBLE,
- DECIMAL,
- DATE,
- TIME,
- TIMESTAMP,
- VARCHAR,
- VARBINARY,
- ANY,
- ARRAY,
- MAP,
- ROW,
- UNSUPPORTED;
-
- /** Returns the column type with the string representation. **/
- public static SqlColumnType getType(String type) {
- if (type == null) {
- return UNSUPPORTED;
- }
- try {
- return SqlColumnType.valueOf(type.toUpperCase());
- } catch (IllegalArgumentException var1) {
- return UNSUPPORTED;
- }
- }
-
- /** Returns true if this type is unsupported. **/
- public boolean isUnsupported() {
- return this.equals(UNSUPPORTED);
- }
-}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 980aec4..183e2d8 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -135,12 +135,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
if (column instanceof SqlTableColumn) {
SqlTableColumn tableColumn = (SqlTableColumn) column;
columnName = tableColumn.getName().getSimple();
- String typeName = tableColumn.getType().getTypeName().getSimple();
- if (SqlColumnType.getType(typeName).isUnsupported()) {
- throw new SqlParseException(
- column.getParserPosition(),
- "Not support type [" + typeName + "], at " + column.getParserPosition());
- }
} else if (column instanceof SqlBasicCall) {
SqlBasicCall tableColumn = (SqlBasicCall) column;
columnName = tableColumn.getOperands()[1].toString();
@@ -241,9 +235,9 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
@Override
public void unparse(
- SqlWriter writer,
- int leftPrec,
- int rightPrec) {
+ SqlWriter writer,
+ int leftPrec,
+ int rightPrec) {
writer.keyword("CREATE TABLE");
tableName.unparse(writer, leftPrec, rightPrec);
SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 97bb093..7a494b3 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -408,6 +408,19 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
}
@Test
+ public void testCreateTableWithUserDefinedType() {
+ final String sql = "create table t(\n" +
+ " a catalog1.db1.MyType1,\n" +
+ " b db2.MyType2\n" +
+ ") with (\n" +
+ " 'k1' = 'v1',\n" +
+ " 'k2' = 'v2'\n" +
+ ")";
+ final String errMsg = "UDT in DDL is not supported yet.";
+ checkFails(sql, errMsg);
+ }
+
+ @Test
public void testInvalidComputedColumn() {
checkFails("CREATE TABLE sls_stream (\n" +
" a bigint, \n" +
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 6fa3c31..1acdd88 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -48,9 +48,12 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
@@ -190,6 +193,156 @@ public class SqlToOperationConverterTest {
assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions());
}
+ @Test // TODO: tweak the tests when FLINK-13604 is fixed.
+ public void testCreateTableWithFullDataTypes() {
+ final List<TestItem> testItems = Arrays.asList(
+ createTestItem("CHAR", DataTypes.CHAR(1)),
+ createTestItem("CHAR NOT NULL", DataTypes.CHAR(1).notNull()),
+ createTestItem("CHAR NULL", DataTypes.CHAR(1)),
+ createTestItem("CHAR(33)", DataTypes.CHAR(33)),
+ createTestItem("VARCHAR", DataTypes.STRING()),
+ createTestItem("VARCHAR(33)", DataTypes.VARCHAR(33)),
+ createTestItem("STRING", DataTypes.STRING()),
+ createTestItem("BOOLEAN", DataTypes.BOOLEAN()),
+ createTestItem("BINARY", DataTypes.BINARY(1)),
+ createTestItem("BINARY(33)", DataTypes.BINARY(33)),
+ createTestItem("VARBINARY", DataTypes.BYTES()),
+ createTestItem("VARBINARY(33)", DataTypes.VARBINARY(33)),
+ createTestItem("BYTES", DataTypes.BYTES()),
+ createTestItem("DECIMAL", DataTypes.DECIMAL(10, 0)),
+ createTestItem("DEC", DataTypes.DECIMAL(10, 0)),
+ createTestItem("NUMERIC", DataTypes.DECIMAL(10, 0)),
+ createTestItem("DECIMAL(10)", DataTypes.DECIMAL(10, 0)),
+ createTestItem("DEC(10)", DataTypes.DECIMAL(10, 0)),
+ createTestItem("NUMERIC(10)", DataTypes.DECIMAL(10, 0)),
+ createTestItem("DECIMAL(10, 3)", DataTypes.DECIMAL(10, 3)),
+ createTestItem("DEC(10, 3)", DataTypes.DECIMAL(10, 3)),
+ createTestItem("NUMERIC(10, 3)", DataTypes.DECIMAL(10, 3)),
+ createTestItem("TINYINT", DataTypes.TINYINT()),
+ createTestItem("SMALLINT", DataTypes.SMALLINT()),
+ createTestItem("INTEGER", DataTypes.INT()),
+ createTestItem("INT", DataTypes.INT()),
+ createTestItem("BIGINT", DataTypes.BIGINT()),
+ createTestItem("FLOAT", DataTypes.FLOAT()),
+ createTestItem("DOUBLE", DataTypes.DOUBLE()),
+ createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()),
+ createTestItem("DATE", DataTypes.DATE()),
+ createTestItem("TIME", DataTypes.TIME()),
+ createTestItem("TIME WITHOUT TIME ZONE", DataTypes.TIME()),
+ // Expect to be TIME(3).
+ createTestItem("TIME(3)", DataTypes.TIME()),
+ // Expect to be TIME(3).
+ createTestItem("TIME(3) WITHOUT TIME ZONE", DataTypes.TIME()),
+ createTestItem("TIMESTAMP", DataTypes.TIMESTAMP(3)),
+ createTestItem("TIMESTAMP WITHOUT TIME ZONE", DataTypes.TIMESTAMP(3)),
+ createTestItem("TIMESTAMP(3)", DataTypes.TIMESTAMP(3)),
+ createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE", DataTypes.TIMESTAMP(3)),
+ createTestItem("TIMESTAMP WITH LOCAL TIME ZONE",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+ createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+ createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>",
+ DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))),
+ createTestItem("ARRAY<INT NOT NULL>",
+ DataTypes.ARRAY(DataTypes.INT().notNull())),
+ createTestItem("INT ARRAY", DataTypes.ARRAY(DataTypes.INT())),
+ createTestItem("INT NOT NULL ARRAY",
+ DataTypes.ARRAY(DataTypes.INT().notNull())),
+ createTestItem("INT ARRAY NOT NULL",
+ DataTypes.ARRAY(DataTypes.INT()).notNull()),
+ createTestItem("MULTISET<INT NOT NULL>",
+ DataTypes.MULTISET(DataTypes.INT().notNull())),
+ createTestItem("INT MULTISET",
+ DataTypes.MULTISET(DataTypes.INT())),
+ createTestItem("INT NOT NULL MULTISET",
+ DataTypes.MULTISET(DataTypes.INT().notNull())),
+ createTestItem("INT MULTISET NOT NULL",
+ DataTypes.MULTISET(DataTypes.INT()).notNull()),
+ createTestItem("MAP<BIGINT, BOOLEAN>",
+ DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BOOLEAN())),
+ // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
+ createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>",
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
+ // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
+ createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)",
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
+ createTestItem("ROW<`f0` INT>",
+ DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
+ createTestItem("ROW(`f0` INT)",
+ DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
+ createTestItem("ROW<>", DataTypes.ROW()),
+ createTestItem("ROW()", DataTypes.ROW()),
+ // Expect to be ROW<`f0` INT NOT NULL '...', `f1` BOOLEAN '...'>.
+ createTestItem("ROW<f0 INT NOT NULL 'This is a comment.',"
+ + " f1 BOOLEAN 'This as well.'>",
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
+ createTestItem("ARRAY<ROW<f0 INT, f1 BOOLEAN>>",
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+ createTestItem("ROW<f0 INT, f1 BOOLEAN> MULTISET",
+ DataTypes.MULTISET(
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+ createTestItem("MULTISET<ROW<f0 INT, f1 BOOLEAN>>",
+ DataTypes.MULTISET(
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+ createTestItem("ROW<f0 Row<f00 INT, f01 BOOLEAN>, "
+ + "f1 INT ARRAY, "
+ + "f2 BOOLEAN MULTISET>",
+ DataTypes.ROW(DataTypes.FIELD("f0",
+ DataTypes.ROW(
+ DataTypes.FIELD("f00", DataTypes.INT()),
+ DataTypes.FIELD("f01", DataTypes.BOOLEAN()))),
+ DataTypes.FIELD("f1", DataTypes.ARRAY(DataTypes.INT())),
+ DataTypes.FIELD("f2", DataTypes.MULTISET(DataTypes.BOOLEAN()))))
+ );
+ StringBuilder buffer = new StringBuilder("create table t1(\n");
+ for (int i = 0; i < testItems.size(); i++) {
+ buffer.append("f")
+ .append(i)
+ .append(" ")
+ .append(testItems.get(i).testExpr);
+ if (i == testItems.size() - 1) {
+ buffer.append(")");
+ } else {
+ buffer.append(",\n");
+ }
+ }
+ final String sql = buffer.toString();
+ final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+ SqlNode node = planner.parse(sql);
+ assert node instanceof SqlCreateTable;
+ Operation operation = SqlToOperationConverter.convert(planner, node);
+ TableSchema schema = ((CreateTableOperation) operation).getCatalogTable().getSchema();
+ Object[] expectedDataTypes = testItems.stream().map(item -> item.expectedType).toArray();
+ assertArrayEquals(expectedDataTypes, schema.getFieldDataTypes());
+ }
+
+ //~ Tool Methods ----------------------------------------------------------
+
+ private static TestItem createTestItem(Object... args) {
+ assert args.length == 2;
+ final String testExpr = (String) args[0];
+ TestItem testItem = TestItem.fromTestExpr(testExpr);
+ if (args[1] instanceof String) {
+ testItem.withExpectedError((String) args[1]);
+ } else {
+ testItem.withExpectedType(args[1]);
+ }
+ return testItem;
+ }
+
private Operation parse(String sql, FlinkPlannerImpl planner) {
SqlNode node = planner.parse(sql);
return SqlToOperationConverter.convert(planner, node);
@@ -200,4 +353,38 @@ public class SqlToOperationConverterTest {
return plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase());
}
+
+ //~ Inner Classes ----------------------------------------------------------
+
+ private static class TestItem {
+ private final String testExpr;
+ @Nullable
+ private Object expectedType;
+ @Nullable
+ private String expectedError;
+
+ private TestItem(String testExpr) {
+ this.testExpr = testExpr;
+ }
+
+ static TestItem fromTestExpr(String testExpr) {
+ return new TestItem(testExpr);
+ }
+
+ TestItem withExpectedType(Object expectedType) {
+ this.expectedType = expectedType;
+ return this;
+ }
+
+ TestItem withExpectedError(String expectedError) {
+ this.expectedError = expectedError;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return this.testExpr;
+ }
+ }
+
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 7225cbc..0424109 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -23,7 +23,9 @@ import org.apache.flink.sql.parser.dml.RichSqlInsert;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
import org.apache.flink.table.calcite.FlinkPlannerImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
@@ -43,14 +45,20 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.PlanningConfigurationBuilder;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.calcite.sql.SqlNode;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
@@ -74,6 +82,9 @@ public class SqlToOperationConverterTest {
new ExpressionBridge<>(functionCatalog,
PlannerExpressionConverter.INSTANCE()));
+ @Rule
+ public ExpectedException expectedEx = ExpectedException.none();
+
@Before
public void before() throws TableAlreadyExistException, DatabaseNotExistException {
final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
@@ -195,9 +206,253 @@ public class SqlToOperationConverterTest {
assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions());
}
+ @Test // TODO: tweak the tests when FLINK-13604 is fixed.
+ public void testCreateTableWithFullDataTypes() {
+ final List<TestItem> testItems = Arrays.asList(
+ // Expect to be DataTypes.CHAR(1).
+ createTestItem("CHAR", DataTypes.STRING()),
+ // Expect to be DataTypes.CHAR(1).notNull().
+ createTestItem("CHAR NOT NULL", DataTypes.STRING()),
+ // Expect to be DataTypes.CHAR(1).
+ createTestItem("CHAR NULL", DataTypes.STRING()),
+ // Expect to be DataTypes.CHAR(33).
+ createTestItem("CHAR(33)", DataTypes.STRING()),
+ createTestItem("VARCHAR", DataTypes.STRING()),
+ // Expect to be DataTypes.VARCHAR(33).
+ createTestItem("VARCHAR(33)", DataTypes.STRING()),
+ createTestItem("STRING", DataTypes.STRING()),
+ createTestItem("BOOLEAN", DataTypes.BOOLEAN()),
+ // Expect to be DECIMAL(10, 0).
+ createTestItem("DECIMAL",
+ TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+ // Expect to be DECIMAL(10, 0).
+ createTestItem("DEC",
+ TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+ // Expect to be DECIMAL(10, 0).
+ createTestItem("NUMERIC",
+ TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+ // Expect to be DECIMAL(10, 0).
+ createTestItem("DECIMAL(10)",
+ TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+ // Expect to be DECIMAL(10, 0).
+ createTestItem("DEC(10)",
+ TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+ // Expect to be DECIMAL(10, 0).
+ createTestItem("NUMERIC(10)",
+ TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+ // Expect to be DECIMAL(10, 3).
+ createTestItem("DECIMAL(10, 3)",
+ TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+ // Expect to be DECIMAL(10, 3).
+ createTestItem("DEC(10, 3)",
+ TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+ // Expect to be DECIMAL(10, 3).
+ createTestItem("NUMERIC(10, 3)",
+ TypeConversions.fromLegacyInfoToDataType(Types.DECIMAL())),
+ createTestItem("TINYINT", DataTypes.TINYINT()),
+ createTestItem("SMALLINT", DataTypes.SMALLINT()),
+ createTestItem("INTEGER", DataTypes.INT()),
+ createTestItem("INT", DataTypes.INT()),
+ createTestItem("BIGINT", DataTypes.BIGINT()),
+ createTestItem("FLOAT", DataTypes.FLOAT()),
+ createTestItem("DOUBLE", DataTypes.DOUBLE()),
+ createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()),
+ createTestItem("DATE",
+ TypeConversions.fromLegacyInfoToDataType(Types.SQL_DATE())),
+ createTestItem("TIME",
+ TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())),
+ createTestItem("TIME WITHOUT TIME ZONE",
+ TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())),
+ // Expect to be Time(3).
+ createTestItem("TIME(3)",
+ TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())),
+ // Expect to be Time(3).
+ createTestItem("TIME(3) WITHOUT TIME ZONE",
+ TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIME())),
+ createTestItem("TIMESTAMP",
+ TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())),
+ createTestItem("TIMESTAMP WITHOUT TIME ZONE",
+ TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())),
+ // Expect to be timestamp(3).
+ createTestItem("TIMESTAMP(3)",
+ TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())),
+ // Expect to be timestamp(3).
+ createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE",
+ TypeConversions.fromLegacyInfoToDataType(Types.SQL_TIMESTAMP())),
+ // Expect to be ARRAY<INT NOT NULL>.
+ createTestItem("ARRAY<INT NOT NULL>",
+ DataTypes.ARRAY(DataTypes.INT())),
+ createTestItem("INT ARRAY", DataTypes.ARRAY(DataTypes.INT())),
+ // Expect to be ARRAY<INT NOT NULL>.
+ createTestItem("INT NOT NULL ARRAY",
+ DataTypes.ARRAY(DataTypes.INT())),
+ // Expect to be ARRAY<INT> NOT NULL.
+ createTestItem("INT ARRAY NOT NULL",
+ DataTypes.ARRAY(DataTypes.INT())),
+ // Expect to be MULTISET<INT NOT NULL>.
+ createTestItem("MULTISET<INT NOT NULL>",
+ DataTypes.MULTISET(DataTypes.INT())),
+ createTestItem("INT MULTISET", DataTypes.MULTISET(DataTypes.INT())),
+ // Expect to be MULTISET<INT NOT NULL>.
+ createTestItem("INT NOT NULL MULTISET",
+ DataTypes.MULTISET(DataTypes.INT())),
+ // Expect to be MULTISET<INT> NOT NULL.
+ createTestItem("INT MULTISET NOT NULL",
+ DataTypes.MULTISET(DataTypes.INT())),
+ createTestItem("MAP<BIGINT, BOOLEAN>",
+ DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BOOLEAN())),
+ // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
+ createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>",
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
+ // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
+ createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)",
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
+ createTestItem("ROW<`f0` INT>",
+ DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
+ createTestItem("ROW(`f0` INT)",
+ DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
+ createTestItem("ROW<>", DataTypes.ROW()),
+ createTestItem("ROW()", DataTypes.ROW()),
+ // Expect to be ROW<`f0` INT NOT NULL '...', `f1` BOOLEAN '...'>.
+ createTestItem("ROW<f0 INT NOT NULL 'This is a comment.', "
+ + "f1 BOOLEAN 'This as well.'>",
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
+ createTestItem("ROW<f0 INT, f1 BOOLEAN> ARRAY",
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+ createTestItem("ARRAY<ROW<f0 INT, f1 BOOLEAN>>",
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+ createTestItem("ROW<f0 INT, f1 BOOLEAN> MULTISET",
+ DataTypes.MULTISET(
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+ createTestItem("MULTISET<ROW<f0 INT, f1 BOOLEAN>>",
+ DataTypes.MULTISET(
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
+ createTestItem("ROW<f0 Row<f00 INT, f01 BOOLEAN>, "
+ + "f1 INT ARRAY, "
+ + "f2 BOOLEAN MULTISET>",
+ DataTypes.ROW(DataTypes.FIELD("f0",
+ DataTypes.ROW(
+ DataTypes.FIELD("f00", DataTypes.INT()),
+ DataTypes.FIELD("f01", DataTypes.BOOLEAN()))),
+ DataTypes.FIELD("f1", DataTypes.ARRAY(DataTypes.INT())),
+ DataTypes.FIELD("f2", DataTypes.MULTISET(DataTypes.BOOLEAN()))))
+ );
+ StringBuilder buffer = new StringBuilder("create table t1(\n");
+ for (int i = 0; i < testItems.size(); i++) {
+ buffer.append("f")
+ .append(i)
+ .append(" ")
+ .append(testItems.get(i).testExpr);
+ if (i == testItems.size() - 1) {
+ buffer.append(")");
+ } else {
+ buffer.append(",\n");
+ }
+ }
+ final String sql = buffer.toString();
+ final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+ SqlNode node = planner.parse(sql);
+ assert node instanceof SqlCreateTable;
+ Operation operation = SqlToOperationConverter.convert(planner, node);
+ TableSchema schema = ((CreateTableOperation) operation).getCatalogTable().getSchema();
+ Object[] expectedDataTypes = testItems.stream().map(item -> item.expectedType).toArray();
+ assertArrayEquals(expectedDataTypes, schema.getFieldDataTypes());
+ }
+
+ @Test
+ public void testCreateTableWithUnSupportedDataTypes() {
+ final List<TestItem> testItems = Arrays.asList(
+ createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>",
+ "Type is not supported: TIMESTAMP_WITH_LOCAL_TIME_ZONE"),
+ createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE",
+ "Type is not supported: TIMESTAMP_WITH_LOCAL_TIME_ZONE"),
+ createTestItem("TIMESTAMP WITH LOCAL TIME ZONE",
+ "Type is not supported: TIMESTAMP_WITH_LOCAL_TIME_ZONE"),
+ createTestItem("BYTES", "Type is not supported: VARBINARY"),
+ createTestItem("VARBINARY(33)", "Type is not supported: VARBINARY"),
+ createTestItem("VARBINARY", "Type is not supported: VARBINARY"),
+ createTestItem("BINARY(33)", "Type is not supported: BINARY"),
+ createTestItem("BINARY", "Type is not supported: BINARY")
+ );
+ final String sqlTemplate = "create table t1(\n" +
+ " f0 %s)";
+ final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+ for (TestItem item : testItems) {
+ String sql = String.format(sqlTemplate, item.testExpr);
+ SqlNode node = planner.parse(sql);
+ assert node instanceof SqlCreateTable;
+ expectedEx.expect(TableException.class);
+ expectedEx.expectMessage(item.expectedError);
+ SqlToOperationConverter.convert(planner, node);
+ }
+ }
+
+ //~ Tool Methods ----------------------------------------------------------
+
+ private static TestItem createTestItem(Object... args) {
+ assert args.length == 2;
+ final String testExpr = (String) args[0];
+ TestItem testItem = TestItem.fromTestExpr(testExpr);
+ if (args[1] instanceof String) {
+ testItem.withExpectedError((String) args[1]);
+ } else {
+ testItem.withExpectedType(args[1]);
+ }
+ return testItem;
+ }
+
private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
tableConfig.setSqlDialect(sqlDialect);
return planningConfigurationBuilder.createFlinkPlanner(catalogManager.getCurrentCatalog(),
catalogManager.getCurrentDatabase());
}
+
+ //~ Inner Classes ----------------------------------------------------------
+
+ private static class TestItem {
+ private final String testExpr;
+ @Nullable
+ private Object expectedType;
+ @Nullable
+ private String expectedError;
+
+ private TestItem(String testExpr) {
+ this.testExpr = testExpr;
+ }
+
+ static TestItem fromTestExpr(String testExpr) {
+ return new TestItem(testExpr);
+ }
+
+ TestItem withExpectedType(Object expectedType) {
+ this.expectedType = expectedType;
+ return this;
+ }
+
+ TestItem withExpectedError(String expectedError) {
+ this.expectedError = expectedError;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return this.testExpr;
+ }
+ }
}