You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yu...@apache.org on 2023/07/04 08:25:49 UTC
[flink] branch master updated: [FLINK-32516][table] Support to parse [CREATE OR ] REPLACE TABLE AS statement (#22934)
This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 76d2a8d0deb [FLINK-32516][table] Support to parse [CREATE OR ] REPLACE TABLE AS statement (#22934)
76d2a8d0deb is described below
commit 76d2a8d0deb8a81a98ed82b6c0613f79bf2a800c
Author: zhangmang <zh...@163.com>
AuthorDate: Tue Jul 4 16:25:42 2023 +0800
[FLINK-32516][table] Support to parse [CREATE OR ] REPLACE TABLE AS statement (#22934)
---
.../src/main/codegen/data/Parser.tdd | 2 +
.../src/main/codegen/includes/parserImpls.ftl | 108 +++++++-
.../flink/sql/parser/ddl/SqlReplaceTableAs.java | 273 +++++++++++++++++++++
.../flink/sql/parser/FlinkSqlParserImplTest.java | 111 +++++++++
4 files changed, 483 insertions(+), 11 deletions(-)
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 49f94a64262..3e04377e47e 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -69,6 +69,7 @@
"org.apache.flink.sql.parser.ddl.SqlDropTable"
"org.apache.flink.sql.parser.ddl.SqlDropView"
"org.apache.flink.sql.parser.ddl.SqlRemoveJar"
+ "org.apache.flink.sql.parser.ddl.SqlReplaceTableAs"
"org.apache.flink.sql.parser.ddl.SqlReset"
"org.apache.flink.sql.parser.ddl.SqlSet"
"org.apache.flink.sql.parser.ddl.SqlTableColumn"
@@ -562,6 +563,7 @@
"SqlShowTables()"
"SqlShowColumns()"
"SqlShowCreate()"
+ "SqlReplaceTable()"
"SqlRichDescribeTable()"
"SqlAlterTable()"
"SqlAlterView()"
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 7f6fc56b2aa..27b0307172f 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1336,17 +1336,32 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
<AS>
asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
- return new SqlCreateTableAs(startPos.plus(getPos()),
- tableName,
- columnList,
- constraints,
- propertyList,
- partitionColumns,
- watermark,
- comment,
- asQuery,
- isTemporary,
- ifNotExists);
+ if (replace) {
+ return new SqlReplaceTableAs(startPos.plus(getPos()),
+ tableName,
+ columnList,
+ constraints,
+ propertyList,
+ partitionColumns,
+ watermark,
+ comment,
+ asQuery,
+ isTemporary,
+ ifNotExists,
+ true);
+ } else {
+ return new SqlCreateTableAs(startPos.plus(getPos()),
+ tableName,
+ columnList,
+ constraints,
+ propertyList,
+ partitionColumns,
+ watermark,
+ comment,
+ asQuery,
+ isTemporary,
+ ifNotExists);
+ }
}
]
{
@@ -1441,6 +1456,77 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean isTemporary) :
}
}
+/**
+* Parser a REPLACE TABLE AS statement
+*/
+SqlNode SqlReplaceTable() :
+{
+ SqlIdentifier tableName;
+ SqlCharStringLiteral comment = null;
+ SqlNode asQuery = null;
+ SqlNodeList propertyList = SqlNodeList.EMPTY;
+ SqlParserPos pos;
+ boolean isTemporary = false;
+ List<SqlTableConstraint> constraints = new ArrayList<SqlTableConstraint>();
+ SqlWatermark watermark = null;
+ SqlNodeList columnList = SqlNodeList.EMPTY;
+ SqlNodeList partitionColumns = SqlNodeList.EMPTY;
+ boolean ifNotExists = false;
+}
+{
+ <REPLACE>
+ [
+ <TEMPORARY> { isTemporary = true; }
+ ]
+ <TABLE>
+
+ ifNotExists = IfNotExistsOpt()
+
+ tableName = CompoundIdentifier()
+ [
+ <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
+ TableColumn(ctx)
+ (
+ <COMMA> TableColumn(ctx)
+ )*
+ {
+ pos = getPos();
+ columnList = new SqlNodeList(ctx.columnList, pos);
+ constraints = ctx.constraints;
+ watermark = ctx.watermark;
+ }
+ <RPAREN>
+ ]
+ [ <COMMENT> <QUOTED_STRING> {
+ String p = SqlParserUtil.parseString(token.image);
+ comment = SqlLiteral.createCharString(p, getPos());
+ }]
+ [
+ <PARTITIONED> <BY>
+ partitionColumns = ParenthesizedSimpleIdentifierList()
+ ]
+ [
+ <WITH>
+ propertyList = TableProperties()
+ ]
+ <AS>
+ asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
+ {
+ return new SqlReplaceTableAs(getPos(),
+ tableName,
+ columnList,
+ constraints,
+ propertyList,
+ partitionColumns,
+ watermark,
+ comment,
+ asQuery,
+ isTemporary,
+ ifNotExists,
+ false);
+ }
+}
+
/**
* Parses an INSERT statement.
*/
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
new file mode 100644
index 00000000000..472dc28535b
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlConstraintValidator;
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link SqlNode} to describe the [CREATE OR] REPLACE TABLE AS (RTAS) syntax. The RTAS would create
+ * a pipeline to compute the result of the given query and create or replace the derived table.
+ *
+ * <p>Notes: REPLACE TABLE AS: the derived table must exist. CREATE OR REPLACE TABLE AS: create the
+ * derived table if it does not exist, otherwise replace it.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * CREATE TABLE base_table (
+ * id BIGINT,
+ * name STRING,
+ * time TIMESTAMP,
+ * PRIMARY KEY(id)
+ * ) WITH (
+ * ‘connector’ = ‘kafka’,
+ * ‘connector.starting-offset’: ‘12345’,
+ * ‘format’ = ‘json’
+ * )
+ *
+ * CREATE OR REPLACE TABLE derived_table
+ * WITH (
+ * 'connector' = 'jdbc',
+ * 'url' = 'http://localhost:10000',
+ * 'table-name' = 'syncedTable'
+ * )
+ * AS SELECT * FROM base_table;
+ * }</pre>
+ */
+public class SqlReplaceTableAs extends SqlCreate implements ExtendedSqlNode {
+
+ public static final SqlSpecialOperator REPLACE_OPERATOR =
+ new SqlSpecialOperator("REPLACE TABLE AS", SqlKind.OTHER_DDL);
+
+ public static final SqlSpecialOperator CREATE_OR_REPLACE_OPERATOR =
+ new SqlSpecialOperator("CREATE OR REPLACE TABLE AS", SqlKind.OTHER_DDL);
+
+ private final SqlIdentifier tableName;
+
+ private final SqlNodeList columnList;
+
+ private final SqlNodeList propertyList;
+
+ private final List<SqlTableConstraint> tableConstraints;
+
+ private final SqlNodeList partitionKeyList;
+
+ private final SqlWatermark watermark;
+
+ private final SqlCharStringLiteral comment;
+
+ private final boolean isTemporary;
+
+ private final boolean isCreateOrReplace;
+
+ private final SqlNode asQuery;
+
+ public SqlReplaceTableAs(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNodeList columnList,
+ List<SqlTableConstraint> tableConstraints,
+ SqlNodeList propertyList,
+ SqlNodeList partitionKeyList,
+ @Nullable SqlWatermark watermark,
+ @Nullable SqlCharStringLiteral comment,
+ SqlNode asQuery,
+ boolean isTemporary,
+ boolean ifNotExists,
+ boolean isCreateOrReplace) {
+ super(
+ isCreateOrReplace ? CREATE_OR_REPLACE_OPERATOR : REPLACE_OPERATOR,
+ pos,
+ true,
+ ifNotExists);
+
+ this.tableName = requireNonNull(tableName, "tableName should not be null");
+ this.columnList = requireNonNull(columnList, "columnList should not be null");
+ this.tableConstraints =
+ requireNonNull(tableConstraints, "table constraints should not be null");
+ this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
+ this.partitionKeyList =
+ requireNonNull(partitionKeyList, "partitionKeyList should not be null");
+ this.watermark = watermark;
+ this.comment = comment;
+ this.isTemporary = isTemporary;
+
+ this.asQuery = asQuery;
+ this.isCreateOrReplace = isCreateOrReplace;
+ }
+
+ @Override
+ public @Nonnull List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(
+ tableName,
+ columnList,
+ new SqlNodeList(tableConstraints, SqlParserPos.ZERO),
+ propertyList,
+ partitionKeyList,
+ watermark,
+ comment,
+ asQuery);
+ }
+
+ @Override
+ public void validate() throws SqlValidateException {
+ SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, columnList);
+ // The following features are not currently supported by RTAS, but may be supported in the
+ // future
+ String errorMsg =
+ isCreateOrReplace ? "CREATE OR REPLACE TABLE AS SELECT" : "REPLACE TABLE AS SELECT";
+
+ if (isIfNotExists()) {
+ throw new SqlValidateException(
+ getParserPosition(),
+ errorMsg + " syntax does not support IF NOT EXISTS statements yet.");
+ }
+
+ if (isTemporary()) {
+ throw new SqlValidateException(
+ getParserPosition(),
+ errorMsg + " syntax does not support temporary table yet.");
+ }
+
+ if (getColumnList().size() > 0) {
+ throw new SqlValidateException(
+ getParserPosition(),
+ errorMsg + " syntax does not support to specify explicit columns yet.");
+ }
+
+ if (getWatermark().isPresent()) {
+ throw new SqlValidateException(
+ getParserPosition(),
+ errorMsg + " syntax does not support to specify explicit watermark yet.");
+ }
+ if (getPartitionKeyList().size() > 0) {
+ throw new SqlValidateException(
+ getParserPosition(),
+ errorMsg + " syntax does not support to create partitioned table yet.");
+ }
+ if (getFullConstraints().stream().anyMatch(SqlTableConstraint::isPrimaryKey)) {
+ throw new SqlValidateException(
+ getParserPosition(),
+ errorMsg + " syntax does not support primary key constraints yet.");
+ }
+ }
+
+ public SqlNode getAsQuery() {
+ return asQuery;
+ }
+
+ public boolean isCreateOrReplace() {
+ return isCreateOrReplace;
+ }
+
+ public SqlIdentifier getTableName() {
+ return tableName;
+ }
+
+ public SqlNodeList getColumnList() {
+ return columnList;
+ }
+
+ public SqlNodeList getPropertyList() {
+ return propertyList;
+ }
+
+ public SqlNodeList getPartitionKeyList() {
+ return partitionKeyList;
+ }
+
+ public List<SqlTableConstraint> getTableConstraints() {
+ return tableConstraints;
+ }
+
+ public Optional<SqlWatermark> getWatermark() {
+ return Optional.ofNullable(watermark);
+ }
+
+ public Optional<SqlCharStringLiteral> getComment() {
+ return Optional.ofNullable(comment);
+ }
+
+ public boolean isIfNotExists() {
+ return ifNotExists;
+ }
+
+ public boolean isTemporary() {
+ return isTemporary;
+ }
+
+ /** Returns the column constraints plus the table constraints. */
+ public List<SqlTableConstraint> getFullConstraints() {
+ return SqlConstraintValidator.getFullConstraints(tableConstraints, columnList);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ if (isCreateOrReplace) {
+ writer.keyword("CREATE OR");
+ }
+ writer.keyword("REPLACE TABLE");
+ tableName.unparse(writer, leftPrec, rightPrec);
+
+ if (comment != null) {
+ writer.newlineAndIndent();
+ writer.keyword("COMMENT");
+ comment.unparse(writer, leftPrec, rightPrec);
+ }
+
+ if (this.propertyList.size() > 0) {
+ writer.keyword("WITH");
+ SqlWriter.Frame withFrame = writer.startList("(", ")");
+ for (SqlNode property : propertyList) {
+ SqlUnparseUtils.printIndent(writer);
+ property.unparse(writer, leftPrec, rightPrec);
+ }
+ writer.newlineAndIndent();
+ writer.endList(withFrame);
+ }
+
+ writer.newlineAndIndent();
+ writer.keyword("AS");
+ writer.newlineAndIndent();
+ this.asQuery.unparse(writer, leftPrec, rightPrec);
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 5e9842bd865..0958e6c58d5 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2524,6 +2524,117 @@ class FlinkSqlParserImplTest extends SqlParserTest {
"CREATE TABLE AS SELECT syntax does not support to create partitioned table yet."));
}
+ @Test
+ void testReplaceTableAsSelect() {
+ // test replace table as select without options
+ sql("REPLACE TABLE t AS SELECT * FROM b").ok("REPLACE TABLE `T`\nAS\nSELECT *\nFROM `B`");
+
+ // test replace table as select with options
+ sql("REPLACE TABLE t WITH ('test' = 'zm') AS SELECT * FROM b")
+ .ok("REPLACE TABLE `T` WITH (\n 'test' = 'zm'\n)\nAS\nSELECT *\nFROM `B`");
+
+ // test replace table as select with tmp table
+ sql("REPLACE TEMPORARY TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "REPLACE TABLE AS SELECT syntax does not support temporary table yet."));
+
+ // test replace table as select with explicit columns
+ sql("REPLACE TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "REPLACE TABLE AS SELECT syntax does not support to specify explicit columns yet."));
+
+ // test replace table as select with watermark
+ sql("REPLACE TABLE t (watermark FOR ts AS ts - interval '3' second) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "REPLACE TABLE AS SELECT syntax does not support to specify explicit watermark yet."));
+
+ // test replace table as select with constraints
+ sql("REPLACE TABLE t (PRIMARY KEY (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "Flink doesn't support ENFORCED mode for PRIMARY KEY constraint. ENFORCED/NOT ENFORCED controls "
+ + "if the constraint checks are performed on the incoming/outgoing data. "
+ + "Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode"));
+
+ sql("REPLACE TABLE t (PRIMARY KEY (col1), PRIMARY KEY (col2) NOT ENFORCED) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(new ValidationMatcher().fails("Duplicate primary key definition"));
+
+ sql("REPLACE TABLE t (UNIQUE (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(new ValidationMatcher().fails("UNIQUE constraint is not supported yet"));
+
+ // test replace table as select with partition key
+ sql("REPLACE TABLE t PARTITIONED BY(col1) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "REPLACE TABLE AS SELECT syntax does not support to create partitioned table yet."));
+ }
+
+ @Test
+ void testCreateOrReplaceTableAsSelect() {
+ // test create or replace table as select without options
+ sql("CREATE OR REPLACE TABLE t AS SELECT * FROM b")
+ .ok("CREATE OR REPLACE TABLE `T`\nAS\nSELECT *\nFROM `B`");
+
+ // test create or replace table as select with options
+ sql("CREATE OR REPLACE TABLE t WITH ('test' = 'zm') AS SELECT * FROM b")
+ .ok(
+ "CREATE OR REPLACE TABLE `T` WITH (\n 'test' = 'zm'\n)\nAS\nSELECT *\nFROM `B`");
+
+ // test create or replace table as select with create table like
+ sql("CREATE OR REPLACE TABLE t (col1 string) WITH ('test' = 'zm') like b ^AS^ SELECT col1 FROM b")
+ .fails("(?s).*Encountered \"AS\" at line 1, column 69.*");
+
+ // test create or replace table as select with tmp table
+ sql("CREATE OR REPLACE TEMPORARY TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "CREATE OR REPLACE TABLE AS SELECT syntax does not support temporary table yet."));
+
+ // test create or replace table as select with explicit columns
+ sql("CREATE OR REPLACE TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "CREATE OR REPLACE TABLE AS SELECT syntax does not support to specify explicit columns yet."));
+
+ // test create or replace table as select with watermark
+ sql("CREATE OR REPLACE TABLE t (watermark FOR ts AS ts - interval '3' second) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "CREATE OR REPLACE TABLE AS SELECT syntax does not support to specify explicit watermark yet."));
+ // test create or replace table as select with constraints
+ sql("CREATE OR REPLACE TABLE t (PRIMARY KEY (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "Flink doesn't support ENFORCED mode for PRIMARY KEY constraint. ENFORCED/NOT ENFORCED controls "
+ + "if the constraint checks are performed on the incoming/outgoing data. "
+ + "Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode"));
+
+ sql("CREATE OR REPLACE TABLE t (PRIMARY KEY (col1), PRIMARY KEY (col2) NOT ENFORCED) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(new ValidationMatcher().fails("Duplicate primary key definition"));
+
+ sql("CREATE OR REPLACE TABLE t (UNIQUE (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(new ValidationMatcher().fails("UNIQUE constraint is not supported yet"));
+
+ // test create or replace table as select with partition key
+ sql("CREATE OR REPLACE TABLE t PARTITIONED BY(col1) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+ .node(
+ new ValidationMatcher()
+ .fails(
+ "CREATE OR REPLACE TABLE AS SELECT syntax does not support to create partitioned table yet."));
+ }
+
@Test
void testShowJobs() {
sql("show jobs").ok("SHOW JOBS");