You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/07/29 10:11:06 UTC
[flink] branch master updated: [FLINK-28463][sql-parser] Supports CREATE TABLE AS SELECT syntax (#20252)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ed8d3acf454 [FLINK-28463][sql-parser] Supports CREATE TABLE AS SELECT syntax (#20252)
ed8d3acf454 is described below
commit ed8d3acf454e62edae458496473a6cf9c20daa24
Author: zhangmang <zh...@163.com>
AuthorDate: Fri Jul 29 18:10:59 2022 +0800
[FLINK-28463][sql-parser] Supports CREATE TABLE AS SELECT syntax (#20252)
---
.../sql/parser/hive/ddl/SqlCreateHiveTable.java | 1 -
.../src/main/codegen/data/Parser.tdd | 2 +
.../src/main/codegen/includes/parserImpls.ftl | 35 ++++-
.../flink/sql/parser/ddl/SqlCreateTable.java | 48 +++---
.../flink/sql/parser/ddl/SqlCreateTableAs.java | 163 +++++++++++++++++++++
.../flink/sql/parser/ddl/SqlCreateTableLike.java | 136 +++++++++++++++++
.../flink/sql/parser/CreateTableLikeTest.java | 6 +-
.../flink/sql/parser/FlinkSqlParserImplTest.java | 62 ++++++++
.../operations/SqlCreateTableConverter.java | 5 +-
9 files changed, 429 insertions(+), 29 deletions(-)
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
index e358db17da6..84659f6ffab 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
@@ -86,7 +86,6 @@ public class SqlCreateHiveTable extends SqlCreateTable {
extractPartColIdentifiers(partColList),
null,
HiveDDLUtils.unescapeStringLiteral(comment),
- null,
isTemporary,
ifNotExists);
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 05cf6ddffa8..8c87f645d02 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -52,6 +52,8 @@
"org.apache.flink.sql.parser.ddl.SqlCreateFunction"
"org.apache.flink.sql.parser.ddl.SqlCreateTable"
"org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
+ "org.apache.flink.sql.parser.ddl.SqlCreateTableAs"
+ "org.apache.flink.sql.parser.ddl.SqlCreateTableLike"
"org.apache.flink.sql.parser.ddl.SqlCreateView"
"org.apache.flink.sql.parser.ddl.SqlDropCatalog"
"org.apache.flink.sql.parser.ddl.SqlDropDatabase"
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index d6d6284ca0e..19677305cd2 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1111,6 +1111,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
SqlNodeList columnList = SqlNodeList.EMPTY;
SqlCharStringLiteral comment = null;
SqlTableLike tableLike = null;
+ SqlNode asQuery = null;
SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlNodeList partitionColumns = SqlNodeList.EMPTY;
@@ -1151,9 +1152,8 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
[
<LIKE>
tableLike = SqlTableLike(getPos())
- ]
- {
- return new SqlCreateTable(startPos.plus(getPos()),
+ {
+ return new SqlCreateTableLike(startPos.plus(getPos()),
tableName,
columnList,
constraints,
@@ -1164,6 +1164,35 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
tableLike,
isTemporary,
ifNotExists);
+ }
+ |
+ <AS>
+ asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
+ {
+ return new SqlCreateTableAs(startPos.plus(getPos()),
+ tableName,
+ columnList,
+ constraints,
+ propertyList,
+ partitionColumns,
+ watermark,
+ comment,
+ asQuery,
+ isTemporary,
+ ifNotExists);
+ }
+ ]
+ {
+ return new SqlCreateTable(startPos.plus(getPos()),
+ tableName,
+ columnList,
+ constraints,
+ propertyList,
+ partitionColumns,
+ watermark,
+ comment,
+ isTemporary,
+ ifNotExists);
}
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 0632e2977ab..53ca49c8347 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -72,8 +72,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
private final SqlCharStringLiteral comment;
- private final SqlTableLike tableLike;
-
private final boolean isTemporary;
public SqlCreateTable(
@@ -85,10 +83,35 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
SqlNodeList partitionKeyList,
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
- @Nullable SqlTableLike tableLike,
boolean isTemporary,
boolean ifNotExists) {
- super(OPERATOR, pos, false, ifNotExists);
+ this(
+ OPERATOR,
+ pos,
+ tableName,
+ columnList,
+ tableConstraints,
+ propertyList,
+ partitionKeyList,
+ watermark,
+ comment,
+ isTemporary,
+ ifNotExists);
+ }
+
+ protected SqlCreateTable(
+ SqlSpecialOperator operator,
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNodeList columnList,
+ List<SqlTableConstraint> tableConstraints,
+ SqlNodeList propertyList,
+ SqlNodeList partitionKeyList,
+ @Nullable SqlWatermark watermark,
+ @Nullable SqlCharStringLiteral comment,
+ boolean isTemporary,
+ boolean ifNotExists) {
+ super(operator, pos, false, ifNotExists);
this.tableName = requireNonNull(tableName, "tableName should not be null");
this.columnList = requireNonNull(columnList, "columnList should not be null");
this.tableConstraints =
@@ -98,7 +121,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
requireNonNull(partitionKeyList, "partitionKeyList should not be null");
this.watermark = watermark;
this.comment = comment;
- this.tableLike = tableLike;
this.isTemporary = isTemporary;
}
@@ -116,8 +138,7 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
propertyList,
partitionKeyList,
watermark,
- comment,
- tableLike);
+ comment);
}
public SqlIdentifier getTableName() {
@@ -148,10 +169,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
return Optional.ofNullable(comment);
}
- public Optional<SqlTableLike> getTableLike() {
- return Optional.ofNullable(tableLike);
- }
-
public boolean isIfNotExists() {
return ifNotExists;
}
@@ -185,10 +202,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
}
}
}
-
- if (tableLike != null) {
- tableLike.validate();
- }
}
public boolean hasRegularColumnsOnly() {
@@ -297,11 +310,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
writer.newlineAndIndent();
writer.endList(withFrame);
}
-
- if (this.tableLike != null) {
- writer.newlineAndIndent();
- this.tableLike.unparse(writer, leftPrec, rightPrec);
- }
}
/** Table creation context. */
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
new file mode 100644
index 00000000000..986d8dadab4
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link SqlNode} to describe the CREATE TABLE AS syntax. The CTAS would create a pipeline to
+ * compute the result of the given query and insert data into the derived table.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * CREATE TABLE base_table (
+ * id BIGINT,
+ * name STRING,
+ * tstmp TIMESTAMP,
+ * PRIMARY KEY(id)
+ * ) WITH (
+ * ‘connector’ = ‘kafka’,
+ * ‘connector.starting-offset’: ‘12345’,
+ * ‘format’ = ‘json’
+ * )
+ *
+ * CREATE TABLE derived_table
+ * WITH (
+ * 'connector' = 'jdbc',
+ * 'url' = 'http://localhost:10000',
+ * 'table-name' = 'syncedTable'
+ * )
+ * AS SELECT * FROM base_table;
+ * }</pre>
+ */
+public class SqlCreateTableAs extends SqlCreateTable {
+
+ public static final SqlSpecialOperator OPERATOR =
+ new SqlSpecialOperator("CREATE TABLE AS", SqlKind.CREATE_TABLE);
+
+ private final SqlNode asQuery;
+
+ public SqlCreateTableAs(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNodeList columnList,
+ List<SqlTableConstraint> tableConstraints,
+ SqlNodeList propertyList,
+ SqlNodeList partitionKeyList,
+ @Nullable SqlWatermark watermark,
+ @Nullable SqlCharStringLiteral comment,
+ SqlNode asQuery,
+ boolean isTemporary,
+ boolean ifNotExists) {
+ super(
+ OPERATOR,
+ pos,
+ tableName,
+ columnList,
+ tableConstraints,
+ propertyList,
+ partitionKeyList,
+ watermark,
+ comment,
+ isTemporary,
+ ifNotExists);
+ this.asQuery =
+ requireNonNull(asQuery, "As clause is required for CREATE TABLE AS SELECT DDL");
+ }
+
+ @Override
+ public @Nonnull SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public @Nonnull List<SqlNode> getOperandList() {
+ return ImmutableNullableList.<SqlNode>builder()
+ .addAll(super.getOperandList())
+ .add(asQuery)
+ .build();
+ }
+
+ @Override
+ public void validate() throws SqlValidateException {
+ super.validate();
+ if (isTemporary()) {
+ throw new SqlValidateException(
+ getParserPosition(),
+ "CREATE TABLE AS SELECT syntax does not support to create temporary table yet.");
+ }
+
+ if (getColumnList().size() > 0) {
+ throw new SqlValidateException(
+ getParserPosition(),
+ "CREATE TABLE AS SELECT syntax does not support to specify explicit columns yet.");
+ }
+
+ if (getWatermark().isPresent()) {
+ throw new SqlValidateException(
+ getParserPosition(),
+ "CREATE TABLE AS SELECT syntax does not support to specify explicit watermark yet.");
+ }
+ // TODO flink dialect supports dynamic partition
+ if (getPartitionKeyList().size() > 0) {
+ throw new SqlValidateException(
+ getParserPosition(),
+ "CREATE TABLE AS SELECT syntax does not support to create partitioned table yet.");
+ }
+ if (getFullConstraints().stream().anyMatch(SqlTableConstraint::isPrimaryKey)) {
+ throw new SqlValidateException(
+ getParserPosition(),
+ "CREATE TABLE AS SELECT syntax does not support primary key constraints yet.");
+ }
+ }
+
+ public SqlNode getAsQuery() {
+ return asQuery;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ super.unparse(writer, leftPrec, rightPrec);
+
+ writer.newlineAndIndent();
+ writer.keyword("AS");
+ writer.newlineAndIndent();
+ this.asQuery.unparse(writer, leftPrec, rightPrec);
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java
new file mode 100644
index 00000000000..879701135e2
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link SqlNode} to describe the CREATE TABLE LIKE syntax. CREATE TABLE LIKE syntax is similar as
+ * CREATE TABLE syntax, besides it has LIKE sub-clause to inherit property of an existed table.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * CREATE TABLE base_table (
+ * id BIGINT,
+ * name STRING,
+ * tstmp TIMESTAMP,
+ * PRIMARY KEY(id)
+ * ) WITH (
+ * ‘connector’ = ‘kafka’,
+ * ‘connector.starting-offset’: ‘12345’,
+ * ‘format’ = ‘json’
+ * )
+ *
+ * CREATE TABLE derived_table (
+ * a int
+ * )
+ * WITH (
+ * 'connector' = 'jdbc',
+ * 'url' = 'http://localhost:10000',
+ * 'table-name' = 'derivedTable'
+ * )
+ * LIKE base_table;
+ * }</pre>
+ */
+public class SqlCreateTableLike extends SqlCreateTable {
+
+ public static final SqlSpecialOperator OPERATOR =
+ new SqlSpecialOperator("CREATE TABLE LIKE", SqlKind.CREATE_TABLE);
+
+ private final SqlTableLike tableLike;
+
+ public SqlCreateTableLike(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNodeList columnList,
+ List<SqlTableConstraint> tableConstraints,
+ SqlNodeList propertyList,
+ SqlNodeList partitionKeyList,
+ @Nullable SqlWatermark watermark,
+ @Nullable SqlCharStringLiteral comment,
+ SqlTableLike tableLike,
+ boolean isTemporary,
+ boolean ifNotExists) {
+ super(
+ OPERATOR,
+ pos,
+ tableName,
+ columnList,
+ tableConstraints,
+ propertyList,
+ partitionKeyList,
+ watermark,
+ comment,
+ isTemporary,
+ ifNotExists);
+ this.tableLike =
+ requireNonNull(tableLike, "LIKE clause is required for CREATE TABLE LIKE DDL");
+ }
+
+ @Override
+ public @Nonnull SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public @Nonnull List<SqlNode> getOperandList() {
+ return ImmutableNullableList.<SqlNode>builder()
+ .addAll(super.getOperandList())
+ .add(tableLike)
+ .build();
+ }
+
+ @Override
+ public void validate() throws SqlValidateException {
+ super.validate();
+ tableLike.validate();
+ }
+
+ public SqlTableLike getTableLike() {
+ return tableLike;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ super.unparse(writer, leftPrec, rightPrec);
+
+ writer.newlineAndIndent();
+ this.tableLike.unparse(writer, leftPrec, rightPrec);
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/CreateTableLikeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/CreateTableLikeTest.java
index 0acbd7a43af..ca10a868656 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/CreateTableLikeTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/CreateTableLikeTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.sql.parser;
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlCreateTableLike;
import org.apache.flink.sql.parser.ddl.SqlTableLike;
import org.apache.flink.sql.parser.ddl.SqlTableLike.FeatureOption;
import org.apache.flink.sql.parser.ddl.SqlTableLike.MergingStrategy;
@@ -251,10 +251,10 @@ class CreateTableLikeTest {
@Override
protected SqlTableLike featureValueOf(SqlNode actual) {
- if (!(actual instanceof SqlCreateTable)) {
+ if (!(actual instanceof SqlCreateTableLike)) {
throw new AssertionError("Node is not a CREATE TABLE stmt.");
}
- return ((SqlCreateTable) actual).getTableLike().orElse(null);
+ return ((SqlCreateTableLike) actual).getTableLike();
}
};
}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 3408ddf1443..b6dd3fdf74e 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1888,6 +1888,68 @@ class FlinkSqlParserImplTest extends SqlParserTest {
.fails("(?s).*Encountered \"\\,\" at line 1, column 32.\n.*");
}
+ @Test
+ void testCreateTableAsSelectWithoutOptions() {
+ sql("CREATE TABLE t AS SELECT * FROM b").ok("CREATE TABLE `T`\nAS\nSELECT *\nFROM `B`");
+ }
+
+ @Test
+ void testCreateTableAsSelectWithOptions() {
+ sql("CREATE TABLE t WITH ('test' = 'zm') AS SELECT * FROM b")
+ .ok("CREATE TABLE `T` WITH (\n 'test' = 'zm'\n)\nAS\nSELECT *\nFROM `B`");
+ }
+
+ @Test
+ void testCreateTableAsSelectWithCreateTableLike() {
+ sql("CREATE TABLE t (col1 string) WITH ('test' = 'zm') like b ^AS^ SELECT col1 FROM b")
+ .fails("(?s).*Encountered \"AS\" at line 1, column 58.*");
+ }
+
+ @Test
+ void testCreateTableAsSelectWithTmpTable() {
+ sql("CREATE TEMPORARY TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "CREATE TABLE AS SELECT syntax does not support to create temporary table yet."));
+ }
+
+ @Test
+ void testCreateTableAsSelectWithExplicitColumns() {
+ sql("CREATE TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "CREATE TABLE AS SELECT syntax does not support to specify explicit columns yet."));
+ }
+
+ @Test
+ void testCreateTableAsSelectWithWatermark() {
+ sql("CREATE TABLE t (watermark FOR ts AS ts - interval '3' second) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "CREATE TABLE AS SELECT syntax does not support to specify explicit watermark yet."));
+ }
+
+ @Test
+ void testCreateTableAsSelectWithConstraints() {
+ sql("CREATE TABLE t (PRIMARY KEY (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "CREATE TABLE AS SELECT syntax does not support primary key constraints yet."));
+ }
+
+ @Test
+ void testCreateTableAsSelectWithPartitionKey() {
+ sql("CREATE TABLE t PARTITIONED BY(col1) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "CREATE TABLE AS SELECT syntax does not support to create partitioned table yet."));
+ }
+
public static BaseMatcher<SqlNode> validated(String validatedSql) {
return new TypeSafeDiagnosingMatcher<SqlNode>() {
@Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 28695b013f2..0e6b51d1e3e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.operations;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlCreateTableLike;
import org.apache.flink.sql.parser.ddl.SqlTableLike;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
@@ -87,8 +88,8 @@ class SqlCreateTableConverter {
final List<String> sourcePartitionKeys;
final List<SqlTableLike.SqlTableLikeOption> likeOptions;
final Map<String, String> sourceProperties;
- if (sqlCreateTable.getTableLike().isPresent()) {
- SqlTableLike sqlTableLike = sqlCreateTable.getTableLike().get();
+ if (sqlCreateTable instanceof SqlCreateTableLike) {
+ SqlTableLike sqlTableLike = ((SqlCreateTableLike) sqlCreateTable).getTableLike();
CatalogTable table = lookupLikeSourceTable(sqlTableLike);
sourceTableSchema =
TableSchema.fromResolvedSchema(