You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yu...@apache.org on 2023/07/04 08:25:49 UTC

[flink] branch master updated: [FLINK-32516][table] Support to parse [CREATE OR ] REPLACE TABLE AS statement (#22934)

This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 76d2a8d0deb [FLINK-32516][table] Support to parse [CREATE OR ] REPLACE TABLE AS statement (#22934)
76d2a8d0deb is described below

commit 76d2a8d0deb8a81a98ed82b6c0613f79bf2a800c
Author: zhangmang <zh...@163.com>
AuthorDate: Tue Jul 4 16:25:42 2023 +0800

    [FLINK-32516][table] Support to parse [CREATE OR ] REPLACE TABLE AS statement (#22934)
---
 .../src/main/codegen/data/Parser.tdd               |   2 +
 .../src/main/codegen/includes/parserImpls.ftl      | 108 +++++++-
 .../flink/sql/parser/ddl/SqlReplaceTableAs.java    | 273 +++++++++++++++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 111 +++++++++
 4 files changed, 483 insertions(+), 11 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 49f94a64262..3e04377e47e 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -69,6 +69,7 @@
     "org.apache.flink.sql.parser.ddl.SqlDropTable"
     "org.apache.flink.sql.parser.ddl.SqlDropView"
     "org.apache.flink.sql.parser.ddl.SqlRemoveJar"
+    "org.apache.flink.sql.parser.ddl.SqlReplaceTableAs"
     "org.apache.flink.sql.parser.ddl.SqlReset"
     "org.apache.flink.sql.parser.ddl.SqlSet"
     "org.apache.flink.sql.parser.ddl.SqlTableColumn"
@@ -562,6 +563,7 @@
     "SqlShowTables()"
     "SqlShowColumns()"
     "SqlShowCreate()"
+    "SqlReplaceTable()"
     "SqlRichDescribeTable()"
     "SqlAlterTable()"
     "SqlAlterView()"
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 7f6fc56b2aa..27b0307172f 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1336,17 +1336,32 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
         <AS>
         asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
         {
-            return new SqlCreateTableAs(startPos.plus(getPos()),
-                tableName,
-                columnList,
-                constraints,
-                propertyList,
-                partitionColumns,
-                watermark,
-                comment,
-                asQuery,
-                isTemporary,
-                ifNotExists);
+            if (replace) {
+                return new SqlReplaceTableAs(startPos.plus(getPos()),
+                    tableName,
+                    columnList,
+                    constraints,
+                    propertyList,
+                    partitionColumns,
+                    watermark,
+                    comment,
+                    asQuery,
+                    isTemporary,
+                    ifNotExists,
+                    true);
+            } else {
+                return new SqlCreateTableAs(startPos.plus(getPos()),
+                    tableName,
+                    columnList,
+                    constraints,
+                    propertyList,
+                    partitionColumns,
+                    watermark,
+                    comment,
+                    asQuery,
+                    isTemporary,
+                    ifNotExists);
+            }
         }
     ]
     {
@@ -1441,6 +1456,77 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean isTemporary) :
     }
 }
 
+/**
+* Parser a REPLACE TABLE AS statement
+*/
+SqlNode SqlReplaceTable() :
+{
+    SqlIdentifier tableName;
+    SqlCharStringLiteral comment = null;
+    SqlNode asQuery = null;
+    SqlNodeList propertyList = SqlNodeList.EMPTY;
+    SqlParserPos pos;
+    boolean isTemporary = false;
+    List<SqlTableConstraint> constraints = new ArrayList<SqlTableConstraint>();
+    SqlWatermark watermark = null;
+    SqlNodeList columnList = SqlNodeList.EMPTY;
+    SqlNodeList partitionColumns = SqlNodeList.EMPTY;
+    boolean ifNotExists = false;
+}
+{
+    <REPLACE>
+    [
+        <TEMPORARY> { isTemporary = true; }
+    ]
+    <TABLE>
+
+    ifNotExists = IfNotExistsOpt()
+
+    tableName = CompoundIdentifier()
+    [
+        <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
+        TableColumn(ctx)
+        (
+            <COMMA> TableColumn(ctx)
+        )*
+        {
+            pos = getPos();
+            columnList = new SqlNodeList(ctx.columnList, pos);
+            constraints = ctx.constraints;
+            watermark = ctx.watermark;
+        }
+        <RPAREN>
+    ]
+    [ <COMMENT> <QUOTED_STRING> {
+        String p = SqlParserUtil.parseString(token.image);
+        comment = SqlLiteral.createCharString(p, getPos());
+    }]
+    [
+        <PARTITIONED> <BY>
+        partitionColumns = ParenthesizedSimpleIdentifierList()
+    ]
+    [
+        <WITH>
+        propertyList = TableProperties()
+    ]
+    <AS>
+    asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
+    {
+        return new SqlReplaceTableAs(getPos(),
+            tableName,
+            columnList,
+            constraints,
+            propertyList,
+            partitionColumns,
+            watermark,
+            comment,
+            asQuery,
+            isTemporary,
+            ifNotExists,
+            false);
+    }
+}
+
 /**
 * Parses an INSERT statement.
 */
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
new file mode 100644
index 00000000000..472dc28535b
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlConstraintValidator;
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link SqlNode} to describe the [CREATE OR] REPLACE TABLE AS (RTAS) syntax. The RTAS would create
+ * a pipeline to compute the result of the given query and create or replace the derived table.
+ *
+ * <p>Notes: REPLACE TABLE AS: the derived table must exist. CREATE OR REPLACE TABLE AS: create the
+ * derived table if it does not exist, otherwise replace it.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * CREATE TABLE base_table (
+ *     id BIGINT,
+ *     name STRING,
+ *     time TIMESTAMP,
+ *     PRIMARY KEY(id)
+ * ) WITH (
+ *     ‘connector’ = ‘kafka’,
+ *     ‘connector.starting-offset’: ‘12345’,
+ *     ‘format’ =  ‘json’
+ * )
+ *
+ * CREATE OR REPLACE TABLE derived_table
+ * WITH (
+ *   'connector' = 'jdbc',
+ *   'url' = 'http://localhost:10000',
+ *   'table-name' = 'syncedTable'
+ * )
+ * AS SELECT * FROM base_table;
+ * }</pre>
+ */
+public class SqlReplaceTableAs extends SqlCreate implements ExtendedSqlNode {
+
+    public static final SqlSpecialOperator REPLACE_OPERATOR =
+            new SqlSpecialOperator("REPLACE TABLE AS", SqlKind.OTHER_DDL);
+
+    public static final SqlSpecialOperator CREATE_OR_REPLACE_OPERATOR =
+            new SqlSpecialOperator("CREATE OR REPLACE TABLE AS", SqlKind.OTHER_DDL);
+
+    private final SqlIdentifier tableName;
+
+    private final SqlNodeList columnList;
+
+    private final SqlNodeList propertyList;
+
+    private final List<SqlTableConstraint> tableConstraints;
+
+    private final SqlNodeList partitionKeyList;
+
+    private final SqlWatermark watermark;
+
+    private final SqlCharStringLiteral comment;
+
+    private final boolean isTemporary;
+
+    private final boolean isCreateOrReplace;
+
+    private final SqlNode asQuery;
+
+    public SqlReplaceTableAs(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlNodeList propertyList,
+            SqlNodeList partitionKeyList,
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            SqlNode asQuery,
+            boolean isTemporary,
+            boolean ifNotExists,
+            boolean isCreateOrReplace) {
+        super(
+                isCreateOrReplace ? CREATE_OR_REPLACE_OPERATOR : REPLACE_OPERATOR,
+                pos,
+                true,
+                ifNotExists);
+
+        this.tableName = requireNonNull(tableName, "tableName should not be null");
+        this.columnList = requireNonNull(columnList, "columnList should not be null");
+        this.tableConstraints =
+                requireNonNull(tableConstraints, "table constraints should not be null");
+        this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
+        this.partitionKeyList =
+                requireNonNull(partitionKeyList, "partitionKeyList should not be null");
+        this.watermark = watermark;
+        this.comment = comment;
+        this.isTemporary = isTemporary;
+
+        this.asQuery = asQuery;
+        this.isCreateOrReplace = isCreateOrReplace;
+    }
+
+    @Override
+    public @Nonnull List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(
+                tableName,
+                columnList,
+                new SqlNodeList(tableConstraints, SqlParserPos.ZERO),
+                propertyList,
+                partitionKeyList,
+                watermark,
+                comment,
+                asQuery);
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, columnList);
+        // The following features are not currently supported by RTAS, but may be supported in the
+        // future
+        String errorMsg =
+                isCreateOrReplace ? "CREATE OR REPLACE TABLE AS SELECT" : "REPLACE TABLE AS SELECT";
+
+        if (isIfNotExists()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support IF NOT EXISTS statements yet.");
+        }
+
+        if (isTemporary()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support temporary table yet.");
+        }
+
+        if (getColumnList().size() > 0) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support to specify explicit columns yet.");
+        }
+
+        if (getWatermark().isPresent()) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support to specify explicit watermark yet.");
+        }
+        if (getPartitionKeyList().size() > 0) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support to create partitioned table yet.");
+        }
+        if (getFullConstraints().stream().anyMatch(SqlTableConstraint::isPrimaryKey)) {
+            throw new SqlValidateException(
+                    getParserPosition(),
+                    errorMsg + " syntax does not support primary key constraints yet.");
+        }
+    }
+
+    public SqlNode getAsQuery() {
+        return asQuery;
+    }
+
+    public boolean isCreateOrReplace() {
+        return isCreateOrReplace;
+    }
+
+    public SqlIdentifier getTableName() {
+        return tableName;
+    }
+
+    public SqlNodeList getColumnList() {
+        return columnList;
+    }
+
+    public SqlNodeList getPropertyList() {
+        return propertyList;
+    }
+
+    public SqlNodeList getPartitionKeyList() {
+        return partitionKeyList;
+    }
+
+    public List<SqlTableConstraint> getTableConstraints() {
+        return tableConstraints;
+    }
+
+    public Optional<SqlWatermark> getWatermark() {
+        return Optional.ofNullable(watermark);
+    }
+
+    public Optional<SqlCharStringLiteral> getComment() {
+        return Optional.ofNullable(comment);
+    }
+
+    public boolean isIfNotExists() {
+        return ifNotExists;
+    }
+
+    public boolean isTemporary() {
+        return isTemporary;
+    }
+
+    /** Returns the column constraints plus the table constraints. */
+    public List<SqlTableConstraint> getFullConstraints() {
+        return SqlConstraintValidator.getFullConstraints(tableConstraints, columnList);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        if (isCreateOrReplace) {
+            writer.keyword("CREATE OR");
+        }
+        writer.keyword("REPLACE TABLE");
+        tableName.unparse(writer, leftPrec, rightPrec);
+
+        if (comment != null) {
+            writer.newlineAndIndent();
+            writer.keyword("COMMENT");
+            comment.unparse(writer, leftPrec, rightPrec);
+        }
+
+        if (this.propertyList.size() > 0) {
+            writer.keyword("WITH");
+            SqlWriter.Frame withFrame = writer.startList("(", ")");
+            for (SqlNode property : propertyList) {
+                SqlUnparseUtils.printIndent(writer);
+                property.unparse(writer, leftPrec, rightPrec);
+            }
+            writer.newlineAndIndent();
+            writer.endList(withFrame);
+        }
+
+        writer.newlineAndIndent();
+        writer.keyword("AS");
+        writer.newlineAndIndent();
+        this.asQuery.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 5e9842bd865..0958e6c58d5 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2524,6 +2524,117 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                                         "CREATE TABLE AS SELECT syntax does not support to create partitioned table yet."));
     }
 
+    @Test
+    void testReplaceTableAsSelect() {
+        // test replace table as select without options
+        sql("REPLACE TABLE t AS SELECT * FROM b").ok("REPLACE TABLE `T`\nAS\nSELECT *\nFROM `B`");
+
+        // test replace table as select with options
+        sql("REPLACE TABLE t WITH ('test' = 'zm') AS SELECT * FROM b")
+                .ok("REPLACE TABLE `T` WITH (\n  'test' = 'zm'\n)\nAS\nSELECT *\nFROM `B`");
+
+        // test replace table as select with tmp table
+        sql("REPLACE TEMPORARY TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "REPLACE TABLE AS SELECT syntax does not support temporary table yet."));
+
+        // test replace table as select with explicit columns
+        sql("REPLACE TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "REPLACE TABLE AS SELECT syntax does not support to specify explicit columns yet."));
+
+        // test replace table as select with watermark
+        sql("REPLACE TABLE t (watermark FOR ts AS ts - interval '3' second) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "REPLACE TABLE AS SELECT syntax does not support to specify explicit watermark yet."));
+
+        // test replace table as select with constraints
+        sql("REPLACE TABLE t (PRIMARY KEY (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "Flink doesn't support ENFORCED mode for PRIMARY KEY constraint. ENFORCED/NOT ENFORCED controls "
+                                                + "if the constraint checks are performed on the incoming/outgoing data. "
+                                                + "Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode"));
+
+        sql("REPLACE TABLE t (PRIMARY KEY (col1), PRIMARY KEY (col2) NOT ENFORCED) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(new ValidationMatcher().fails("Duplicate primary key definition"));
+
+        sql("REPLACE TABLE t (UNIQUE (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(new ValidationMatcher().fails("UNIQUE constraint is not supported yet"));
+
+        // test replace table as select with partition key
+        sql("REPLACE TABLE t PARTITIONED BY(col1) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "REPLACE TABLE AS SELECT syntax does not support to create partitioned table yet."));
+    }
+
+    @Test
+    void testCreateOrReplaceTableAsSelect() {
+        // test create or replace table as select without options
+        sql("CREATE OR REPLACE TABLE t AS SELECT * FROM b")
+                .ok("CREATE OR REPLACE TABLE `T`\nAS\nSELECT *\nFROM `B`");
+
+        // test create or replace table as select with options
+        sql("CREATE OR REPLACE TABLE t WITH ('test' = 'zm') AS SELECT * FROM b")
+                .ok(
+                        "CREATE OR REPLACE TABLE `T` WITH (\n  'test' = 'zm'\n)\nAS\nSELECT *\nFROM `B`");
+
+        // test create or replace table as select with create table like
+        sql("CREATE OR REPLACE TABLE t (col1 string) WITH ('test' = 'zm') like b ^AS^ SELECT col1 FROM b")
+                .fails("(?s).*Encountered \"AS\" at line 1, column 69.*");
+
+        // test create or replace table as select with tmp table
+        sql("CREATE OR REPLACE TEMPORARY TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "CREATE OR REPLACE TABLE AS SELECT syntax does not support temporary table yet."));
+
+        // test create or replace table as select with explicit columns
+        sql("CREATE OR REPLACE TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "CREATE OR REPLACE TABLE AS SELECT syntax does not support to specify explicit columns yet."));
+
+        // test create or replace table as select with watermark
+        sql("CREATE OR REPLACE TABLE t (watermark FOR ts AS ts - interval '3' second) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "CREATE OR REPLACE TABLE AS SELECT syntax does not support to specify explicit watermark yet."));
+        // test create or replace table as select with constraints
+        sql("CREATE OR REPLACE TABLE t (PRIMARY KEY (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "Flink doesn't support ENFORCED mode for PRIMARY KEY constraint. ENFORCED/NOT ENFORCED controls "
+                                                + "if the constraint checks are performed on the incoming/outgoing data. "
+                                                + "Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode"));
+
+        sql("CREATE OR REPLACE TABLE t (PRIMARY KEY (col1), PRIMARY KEY (col2) NOT ENFORCED) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(new ValidationMatcher().fails("Duplicate primary key definition"));
+
+        sql("CREATE OR REPLACE TABLE t (UNIQUE (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(new ValidationMatcher().fails("UNIQUE constraint is not supported yet"));
+
+        // test create or replace table as select with partition key
+        sql("CREATE OR REPLACE TABLE t PARTITIONED BY(col1) WITH ('test' = 'zm') AS SELECT col1 FROM b")
+                .node(
+                        new ValidationMatcher()
+                                .fails(
+                                        "CREATE OR REPLACE TABLE AS SELECT syntax does not support to create partitioned table yet."));
+    }
+
     @Test
     void testShowJobs() {
         sql("show jobs").ok("SHOW JOBS");