You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/16 07:58:32 UTC

[GitHub] [flink] lsyldliu commented on a diff in pull request #20252: [FLINK-28463][flink-sql-parser] Flink dialect supports CREATE TABLE AS SELECT(CTAS) syntax

lsyldliu commented on code in PR #20252:
URL: https://github.com/apache/flink/pull/20252#discussion_r922646290


##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java:
##########
@@ -76,6 +76,8 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 
     private final boolean isTemporary;
 
+    private final SqlNode query;

Review Comment:
   What do you think about introduce a new SqlNode `SqlCreateTableAs` which extends the `SqlCreate` and implement `ExtendedSqlNode`? In this node, we refer to the `SqlCreateTable` and `asQuery` as member, so we can separate the two different function node, it seems that more clear. 
   Moreover, maybe we need hint in ctas syntax, such as `CREATE TABLE ... AS SELECT ... /*+ `OPTIONS`('key' = 'val') */`



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/CreateTableAsSelectTest.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for parsing and validating CREATE TABLE AS SELECT(CTAS) clause in {@link SqlCreateTable}.
+ */
+class CreateTableAsSelectTest {
+
+    @Test
+    void testNoOptions() throws Exception {
+        SqlNode actualNode = createFlinkParser("CREATE TABLE t AS SELECT * FROM b").parseStmt();
+
+        assertThat(actualNode.toString()).isEqualTo("CREATE TABLE `t`\nAS\nSELECT *\nFROM `b`");
+    }
+
+    @Test
+    void testWithOptions() throws Exception {
+        SqlNode actualNode =
+                createFlinkParser("CREATE TABLE t WITH ('test' = 'zm') AS SELECT * FROM b")
+                        .parseStmt();
+
+        assertThat(actualNode.toString())
+                .isEqualTo("CREATE TABLE `t` WITH (\n  'test' = 'zm'\n)\nAS\nSELECT *\nFROM `b`");
+    }
+
+    @Test
+    void testWithColumns() throws Exception {

Review Comment:
   Test case as following:
   
   1. testCreateTableAsSelectWithExplicitColumns
   2. testCreateTableAsSelectWithWatermark
   3. testCreateTableAsSelectWithConstraints
   4. testCreateTableAsSelectWithPartitionKey



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java:
##########
@@ -189,6 +198,15 @@ public void validate() throws SqlValidateException {
         if (tableLike != null) {
             tableLike.validate();
         }
+
+        if (query != null
+                && (columnList.size() > 0

Review Comment:
   I think we should validate these in separated if branch. In each case, we should give a clear error message.



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java:
##########
@@ -189,6 +198,15 @@ public void validate() throws SqlValidateException {
         if (tableLike != null) {
             tableLike.validate();
         }
+
+        if (query != null
+                && (columnList.size() > 0
+                        || constraints.size() > 0

Review Comment:
   Here exists following case we don't support:
   
   1. the target table is temporary table
   2. Specify the columns explicitly
   3. Specify the watermark explicitly
   4. Specify the constraint explicitly
   
   Regarding to partition key, we think we should support it, see [spark](https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table-datasource.html) also support it. 



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/CreateTableAsSelectTest.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for parsing and validating CREATE TABLE AS SELECT(CTAS) clause in {@link SqlCreateTable}.
+ */
+class CreateTableAsSelectTest {

Review Comment:
   I think we can move these tests to `FlinkSqlParserImplTest`, then we can reuse the code. BTW, to the illegal test, we can use the `sql(sql).fails`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org