You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/07/29 10:11:06 UTC

[flink] branch master updated: [FLINK-28463][sql-parser] Supports CREATE TABLE AS SELECT syntax (#20252)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ed8d3acf454 [FLINK-28463][sql-parser] Supports CREATE TABLE AS SELECT syntax (#20252)
ed8d3acf454 is described below

commit ed8d3acf454e62edae458496473a6cf9c20daa24
Author: zhangmang <zh...@163.com>
AuthorDate: Fri Jul 29 18:10:59 2022 +0800

    [FLINK-28463][sql-parser] Supports CREATE TABLE AS SELECT syntax (#20252)
---
 .../sql/parser/hive/ddl/SqlCreateHiveTable.java    |   1 -
 .../src/main/codegen/data/Parser.tdd               |   2 +
 .../src/main/codegen/includes/parserImpls.ftl      |  35 ++++-
 .../flink/sql/parser/ddl/SqlCreateTable.java       |  48 +++---
 .../flink/sql/parser/ddl/SqlCreateTableAs.java     | 163 +++++++++++++++++++++
 .../flink/sql/parser/ddl/SqlCreateTableLike.java   | 136 +++++++++++++++++
 .../flink/sql/parser/CreateTableLikeTest.java      |   6 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  62 ++++++++
 .../operations/SqlCreateTableConverter.java        |   5 +-
 9 files changed, 429 insertions(+), 29 deletions(-)

diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
index e358db17da6..84659f6ffab 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlCreateHiveTable.java
@@ -86,7 +86,6 @@ public class SqlCreateHiveTable extends SqlCreateTable {
                 extractPartColIdentifiers(partColList),
                 null,
                 HiveDDLUtils.unescapeStringLiteral(comment),
-                null,
                 isTemporary,
                 ifNotExists);
 
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 05cf6ddffa8..8c87f645d02 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -52,6 +52,8 @@
     "org.apache.flink.sql.parser.ddl.SqlCreateFunction"
     "org.apache.flink.sql.parser.ddl.SqlCreateTable"
     "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
+    "org.apache.flink.sql.parser.ddl.SqlCreateTableAs"
+    "org.apache.flink.sql.parser.ddl.SqlCreateTableLike"
     "org.apache.flink.sql.parser.ddl.SqlCreateView"
     "org.apache.flink.sql.parser.ddl.SqlDropCatalog"
     "org.apache.flink.sql.parser.ddl.SqlDropDatabase"
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index d6d6284ca0e..19677305cd2 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1111,6 +1111,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
     SqlNodeList columnList = SqlNodeList.EMPTY;
 	SqlCharStringLiteral comment = null;
 	SqlTableLike tableLike = null;
+    SqlNode asQuery = null;
 
     SqlNodeList propertyList = SqlNodeList.EMPTY;
     SqlNodeList partitionColumns = SqlNodeList.EMPTY;
@@ -1151,9 +1152,8 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
     [
         <LIKE>
         tableLike = SqlTableLike(getPos())
-    ]
-    {
-        return new SqlCreateTable(startPos.plus(getPos()),
+        {
+            return new SqlCreateTableLike(startPos.plus(getPos()),
                 tableName,
                 columnList,
                 constraints,
@@ -1164,6 +1164,35 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
                 tableLike,
                 isTemporary,
                 ifNotExists);
+        }
+    |
+        <AS>
+        asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
+        {
+            return new SqlCreateTableAs(startPos.plus(getPos()),
+                tableName,
+                columnList,
+                constraints,
+                propertyList,
+                partitionColumns,
+                watermark,
+                comment,
+                asQuery,
+                isTemporary,
+                ifNotExists);
+        }
+    ]
+    {
+        return new SqlCreateTable(startPos.plus(getPos()),
+            tableName,
+            columnList,
+            constraints,
+            propertyList,
+            partitionColumns,
+            watermark,
+            comment,
+            isTemporary,
+            ifNotExists);
     }
 }
 
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 0632e2977ab..53ca49c8347 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -72,8 +72,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 
     private final SqlCharStringLiteral comment;
 
-    private final SqlTableLike tableLike;
-
     private final boolean isTemporary;
 
     public SqlCreateTable(
@@ -85,10 +83,35 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
             SqlNodeList partitionKeyList,
             @Nullable SqlWatermark watermark,
             @Nullable SqlCharStringLiteral comment,
-            @Nullable SqlTableLike tableLike,
             boolean isTemporary,
             boolean ifNotExists) {
-        super(OPERATOR, pos, false, ifNotExists);
+        this(
+                OPERATOR,
+                pos,
+                tableName,
+                columnList,
+                tableConstraints,
+                propertyList,
+                partitionKeyList,
+                watermark,
+                comment,
+                isTemporary,
+                ifNotExists);
+    }
+
+    protected SqlCreateTable(
+            SqlSpecialOperator operator,
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            boolean isTemporary,
+            boolean ifNotExists) {
+        super(operator, pos, false, ifNotExists);
         this.tableName = requireNonNull(tableName, "tableName should not be null");
         this.columnList = requireNonNull(columnList, "columnList should not be null");
         this.tableConstraints =
@@ -98,7 +121,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
                 requireNonNull(partitionKeyList, "partitionKeyList should not be null");
         this.watermark = watermark;
         this.comment = comment;
-        this.tableLike = tableLike;
         this.isTemporary = isTemporary;
     }
 
@@ -116,8 +138,7 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
                 propertyList,
                 partitionKeyList,
                 watermark,
-                comment,
-                tableLike);
+                comment);
     }
 
     public SqlIdentifier getTableName() {
@@ -148,10 +169,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
         return Optional.ofNullable(comment);
     }
 
-    public Optional<SqlTableLike> getTableLike() {
-        return Optional.ofNullable(tableLike);
-    }
-
     public boolean isIfNotExists() {
         return ifNotExists;
     }
@@ -185,10 +202,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
                 }
             }
         }
-
-        if (tableLike != null) {
-            tableLike.validate();
-        }
     }
 
     public boolean hasRegularColumnsOnly() {
@@ -297,11 +310,6 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
             writer.newlineAndIndent();
             writer.endList(withFrame);
         }
-
-        if (this.tableLike != null) {
-            writer.newlineAndIndent();
-            this.tableLike.unparse(writer, leftPrec, rightPrec);
-        }
     }
 
     /** Table creation context. */
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
new file mode 100644
index 00000000000..986d8dadab4
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link SqlNode} to describe the CREATE TABLE AS syntax. The CTAS would create a pipeline to
+ * compute the result of the given query and insert data into the derived table.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * CREATE TABLE base_table (
+ *     id BIGINT,
+ *     name STRING,
+ *     tstmp TIMESTAMP,
+ *     PRIMARY KEY(id)
+ * ) WITH (
+ *     ‘connector’ = ‘kafka’,
+ *     ‘connector.starting-offset’: ‘12345’,
+ *     ‘format’ =  ‘json’
+ * )
+ *
+ * CREATE TABLE derived_table
+ * WITH (
+ *   'connector' = 'jdbc',
+ *   'url' = 'http://localhost:10000',
+ *   'table-name' = 'syncedTable'
+ * )
+ * AS SELECT * FROM base_table;
+ * }</pre>
+ */
+public class SqlCreateTableAs extends SqlCreateTable {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("CREATE TABLE AS", SqlKind.CREATE_TABLE);
+
+    private final SqlNode asQuery;
+
+    public SqlCreateTableAs(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            SqlNode asQuery,
+            boolean isTemporary,
+            boolean ifNotExists) {
+        super(
+                OPERATOR,
+                pos,
+                tableName,
+                columnList,
+                tableConstraints,
+                propertyList,
+                partitionKeyList,
+                watermark,
+                comment,
+                isTemporary,
+                ifNotExists);
+        this.asQuery =
+                requireNonNull(asQuery, "As clause is required for CREATE TABLE AS SELECT DDL");
+    }
+
+    @Override
+    public @Nonnull SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public @Nonnull List<SqlNode> getOperandList() {
+        return ImmutableNullableList.<SqlNode>builder()
+                .addAll(super.getOperandList())
+                .add(asQuery)
+                .build();
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        super.validate();
+        if (isTemporary()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    "CREATE TABLE AS SELECT syntax does not support to create temporary table yet.");
+        }
+
+        if (getColumnList().size() > 0) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    "CREATE TABLE AS SELECT syntax does not support to specify explicit columns yet.");
+        }
+
+        if (getWatermark().isPresent()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    "CREATE TABLE AS SELECT syntax does not support to specify explicit watermark yet.");
+        }
+        // TODO flink dialect supports dynamic partition
+        if (getPartitionKeyList().size() > 0) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    "CREATE TABLE AS SELECT syntax does not support to create partitioned table yet.");
+        }
+        if (getFullConstraints().stream().anyMatch(SqlTableConstraint::isPrimaryKey)) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    "CREATE TABLE AS SELECT syntax does not support primary key constraints yet.");
+        }
+    }
+
+    public SqlNode getAsQuery() {
+        return asQuery;
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+
+        writer.newlineAndIndent();
+        writer.keyword("AS");
+        writer.newlineAndIndent();
+        this.asQuery.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java
new file mode 100644
index 00000000000..879701135e2
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link SqlNode} to describe the CREATE TABLE LIKE syntax. CREATE TABLE LIKE syntax is similar as
+ * CREATE TABLE syntax, besides it has LIKE sub-clause to inherit property of an existed table.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * CREATE TABLE base_table (
+ *     id BIGINT,
+ *     name STRING,
+ *     tstmp TIMESTAMP,
+ *     PRIMARY KEY(id)
+ * ) WITH (
+ *     ‘connector’ = ‘kafka’,
+ *     ‘connector.starting-offset’: ‘12345’,
+ *     ‘format’ =  ‘json’
+ * )
+ *
+ * CREATE TABLE derived_table (
+ *      a int
+ * )
+ * WITH (
+ *   'connector' = 'jdbc',
+ *   'url' = 'http://localhost:10000',
+ *   'table-name' = 'derivedTable'
+ * )
+ * LIKE base_table;
+ * }</pre>
+ */
+public class SqlCreateTableLike extends SqlCreateTable {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("CREATE TABLE LIKE", SqlKind.CREATE_TABLE);
+
+    private final SqlTableLike tableLike;
+
+    public SqlCreateTableLike(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            SqlTableLike tableLike,
+            boolean isTemporary,
+            boolean ifNotExists) {
+        super(
+                OPERATOR,
+                pos,
+                tableName,
+                columnList,
+                tableConstraints,
+                propertyList,
+                partitionKeyList,
+                watermark,
+                comment,
+                isTemporary,
+                ifNotExists);
+        this.tableLike =
+                requireNonNull(tableLike, "LIKE clause is required for CREATE TABLE LIKE DDL");
+    }
+
+    @Override
+    public @Nonnull SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public @Nonnull List<SqlNode> getOperandList() {
+        return ImmutableNullableList.<SqlNode>builder()
+                .addAll(super.getOperandList())
+                .add(tableLike)
+                .build();
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        super.validate();
+        tableLike.validate();
+    }
+
+    public SqlTableLike getTableLike() {
+        return tableLike;
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+
+        writer.newlineAndIndent();
+        this.tableLike.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/CreateTableLikeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/CreateTableLikeTest.java
index 0acbd7a43af..ca10a868656 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/CreateTableLikeTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/CreateTableLikeTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.sql.parser;
 
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlCreateTableLike;
 import org.apache.flink.sql.parser.ddl.SqlTableLike;
 import org.apache.flink.sql.parser.ddl.SqlTableLike.FeatureOption;
 import org.apache.flink.sql.parser.ddl.SqlTableLike.MergingStrategy;
@@ -251,10 +251,10 @@ class CreateTableLikeTest {
 
             @Override
             protected SqlTableLike featureValueOf(SqlNode actual) {
-                if (!(actual instanceof SqlCreateTable)) {
+                if (!(actual instanceof SqlCreateTableLike)) {
                     throw new AssertionError("Node is not a CREATE TABLE stmt.");
                 }
-                return ((SqlCreateTable) actual).getTableLike().orElse(null);
+                return ((SqlCreateTableLike) actual).getTableLike();
             }
         };
     }
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 3408ddf1443..b6dd3fdf74e 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1888,6 +1888,68 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                 .fails("(?s).*Encountered \"\\,\" at line 1, column 32.\n.*");
     }
 
+    @Test
+    void testCreateTableAsSelectWithoutOptions() {
+        sql("CREATE TABLE t AS SELECT * FROM b").ok("CREATE TABLE `T`\nAS\nSELECT *\nFROM `B`");
+    }
+
+    @Test
+    void testCreateTableAsSelectWithOptions() {
+        sql("CREATE TABLE t WITH ('test' = 'zm') AS SELECT * FROM b")
+                .ok("CREATE TABLE `T` WITH (\n  'test' = 'zm'\n)\nAS\nSELECT *\nFROM `B`");
+    }
+
+    @Test
+    void testCreateTableAsSelectWithCreateTableLike() {
+        sql("CREATE TABLE t (col1 string) WITH ('test' = 'zm') like b ^AS^ SELECT col1 FROM b")
+                .fails("(?s).*Encountered \"AS\" at line 1, column 58.*");
+    }
+
+    @Test
+    void testCreateTableAsSelectWithTmpTable() {
+        sql("CREATE TEMPORARY TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "CREATE TABLE AS SELECT syntax does not support to create temporary table yet."));
+    }
+
+    @Test
+    void testCreateTableAsSelectWithExplicitColumns() {
+        sql("CREATE TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "CREATE TABLE AS SELECT syntax does not support to specify explicit columns yet."));
+    }
+
+    @Test
+    void testCreateTableAsSelectWithWatermark() {
+        sql("CREATE TABLE t (watermark FOR ts AS ts - interval '3' second) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "CREATE TABLE AS SELECT syntax does not support to specify explicit watermark yet."));
+    }
+
+    @Test
+    void testCreateTableAsSelectWithConstraints() {
+        sql("CREATE TABLE t (PRIMARY KEY (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "CREATE TABLE AS SELECT syntax does not support primary key constraints yet."));
+    }
+
+    @Test
+    void testCreateTableAsSelectWithPartitionKey() {
+        sql("CREATE TABLE t PARTITIONED BY(col1) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "CREATE TABLE AS SELECT syntax does not support to create partitioned table yet."));
+    }
+
     public static BaseMatcher<SqlNode> validated(String validatedSql) {
         return new TypeSafeDiagnosingMatcher<SqlNode>() {
             @Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 28695b013f2..0e6b51d1e3e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.operations;
 
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlCreateTableLike;
 import org.apache.flink.sql.parser.ddl.SqlTableLike;
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
@@ -87,8 +88,8 @@ class SqlCreateTableConverter {
         final List<String> sourcePartitionKeys;
         final List<SqlTableLike.SqlTableLikeOption> likeOptions;
         final Map<String, String> sourceProperties;
-        if (sqlCreateTable.getTableLike().isPresent()) {
-            SqlTableLike sqlTableLike = sqlCreateTable.getTableLike().get();
+        if (sqlCreateTable instanceof SqlCreateTableLike) {
+            SqlTableLike sqlTableLike = ((SqlCreateTableLike) sqlCreateTable).getTableLike();
             CatalogTable table = lookupLikeSourceTable(sqlTableLike);
             sourceTableSchema =
                     TableSchema.fromResolvedSchema(