You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/03 09:57:00 UTC

[GitHub] [flink] slinkydeveloper commented on a change in pull request #18215: [FLINK-25392][table-planner]Support new StatementSet syntax in planner and parser

slinkydeveloper commented on a change in pull request #18215:
URL: https://github.com/apache/flink/pull/18215#discussion_r777368297



##########
File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/SqlExecute.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dml;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * SqlExecute contains a statement to execute. the statement can be {@link SqlSelect}, {@link
+ * SqlStatementSet}, or {@link RichSqlInsert}.

Review comment:
       Can you add a SQL statement example here?

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java
##########
@@ -23,22 +23,33 @@
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.planner.utils.PlannerMocks;
 
-import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 /** Test for {@link FlinkCalciteSqlValidator}. */
 public class FlinkCalciteSqlValidatorTest {
 
+    @Rule public final ExpectedException thrown = ExpectedException.none();
+
     private final PlannerMocks plannerMocks =
             PlannerMocks.create()
                     .registerTemporaryTable(
                             "t1", Schema.newBuilder().column("a", DataTypes.INT()).build());
 
     @Test
     public void testUpsertInto() {
-        Assert.assertThrows(
-                "UPSERT INTO statement is not supported. Please use INSERT INTO instead.",
-                ValidationException.class,
-                () -> plannerMocks.getParser().parse("UPSERT INTO t1 VALUES(1)"));
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "UPSERT INTO statement is not supported. Please use INSERT INTO instead.");
+        plannerMocks.getParser().parse("UPSERT INTO t1 VALUES(1)");
+    }
+
+    @Test
+    public void testExplainUpsertInto() {
+        thrown.expect(ValidationException.class);
+        thrown.expectMessage(
+                "UPSERT INTO statement is not supported. Please use INSERT INTO instead.");
+        plannerMocks.getParser().parse("EXPLAIN UPSERT INTO t1 VALUES(1)");

Review comment:
       Please use assertj and remove `ExpectedException`:
   
   ```
   assertThatThrownBy(() -> plannerMocks.getParser().parse("EXPLAIN UPSERT INTO t1 VALUES(1)"))
     .isInstanceOf(ValidationException.class)
     .hasMessage("UPSERT INTO statement is not supported. Please use INSERT INTO instead.");
   ```

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
##########
@@ -107,6 +107,22 @@ class TableEnvironmentTest {
     assertEquals(TableTestUtil.replaceStageId(expected), TableTestUtil.replaceStageId(actual))
   }
 
+  @Test
+  def testExplainWithExecuteInsert(): Unit = {
+    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
+    val tEnv = StreamTableEnvironment.create(execEnv, settings)
+
+    TestTableSourceSinks.createPersonCsvTemporaryTable(tEnv, "MyTable")
+
+    TestTableSourceSinks.createCsvTemporarySinkTable(
+      tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink", -1)
+
+    val expected = TableTestUtil.readFromResource("/explain/testStreamTableEnvironmentExplain.out")
+    val actual = tEnv.explainSql("execute insert into MySink select first from MyTable")
+    assertEquals(TableTestUtil.replaceStageId(expected), TableTestUtil.replaceStageId(actual))

Review comment:
       Use assertj `assertThat().isEqualTo()`

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
##########
@@ -145,19 +145,26 @@ class FlinkPlannerImpl(
       }
       sqlNode match {
         case richExplain: SqlRichExplain =>
-          val validatedStatement = richExplain.getStatement match {
-            case insert: RichSqlInsert =>
-              val validatedSource = validator.validate(insert.getSource)
-              insert.setOperand(2, validatedSource)
-              insert
-            case others =>
-              validator.validate(others)
-          }
-          richExplain.setOperand(0, validatedStatement)
+          richExplain.setOperand(0, validate(richExplain.getStatement))
           richExplain
-        // Insert requires validation but no row validation
-        case insert: SqlInsert =>
-          validator.validateInsert(insert)
+        case statementSet: SqlStatementSet =>
+          statementSet.getInserts.asScala.zipWithIndex.foreach {
+            case (insert, idx) => statementSet.setOperand(idx, validate(insert))
+          }
+          statementSet
+        case execute: SqlExecute =>
+          execute.setOperand(0, validate(execute.getStatement))
+          execute
+        case insert: RichSqlInsert =>
+          // We don't support UPSERT INTO semantics (see FLINK-24225).
+          if (insert.isUpsert) {
+            throw new ValidationException(
+              "UPSERT INTO statement is not supported. Please use INSERT INTO instead.")
+          }
+          // only validate source here.
+          // ignore row type which will be verified in table environment.
+          val validatedSource = validator.validate(insert.getSource)
+          insert.setOperand(2, validatedSource)

Review comment:
       On master this is not validated, correct?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
##########
@@ -126,17 +125,4 @@ public void validateColumnListParams(
         // this makes it possible to ignore them in the validator and fall back to regular row types
         // see also SqlFunction#deriveType
     }
-

Review comment:
       Why have you removed this code?

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
##########
@@ -1387,6 +1389,50 @@ public void testSqlRichExplainWithInsert() {
         assertTrue(operation instanceof ExplainOperation);
     }
 
+    @Test
+    public void testSqlRichExplainWithStatementSet() {
+        final String sql =
+                "explain plan for statement set begin "
+                        + "insert into t1 select a, b, c, d from t2 where a > 1;"
+                        + "insert into t1 select a, b, c, d from t2 where a > 2;"
+                        + "end";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertTrue(operation instanceof ExplainOperation);

Review comment:
       Use assertj `assertThat(operation).isInstanceOf(ExplainOperation)`

##########
File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/SqlStatementSet.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dml;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Statement Set. */

Review comment:
       Can you add a SQL statement example here?

##########
File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/MatchRecognizeJsonPlanTest_jsonplan/testMatch.out
##########
@@ -427,7 +463,7 @@
         "cid" : "BIGINT"
       } ]
     },
-    "description" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".id) AS aid, FINAL(l.id) AS bid, FINAL(C.id) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$1, 0), _UTF-16LE'a'), l==(LAST(*.$1, 0), _UTF-16LE'b'), C==(LAST(*.$1, 0), _UTF-16LE'c')}])"
+    "description" : "Match(orderBy=[proctime ASC], measures=[FINAL(FINAL(A\".id)) AS aid, FINAL(FINAL(l.id)) AS bid, FINAL(FINAL(C.id)) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$1, 0), _UTF-16LE'a'), l==(LAST(*.$1, 0), _UTF-16LE'b'), C==(LAST(*.$1, 0), _UTF-16LE'c')}])"

Review comment:
       Why this plan change?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -972,20 +987,21 @@ private Operation convertShowViews(SqlShowViews sqlShowViews) {
 
     /** Convert RICH EXPLAIN statement. */
     private Operation convertRichExplain(SqlRichExplain sqlExplain) {
-        Operation operation;
+        List<Operation> operations = new ArrayList<>();

Review comment:
       No need to instantiate this list, just assign operations in the if branches with either a singleton list or the list returned by `convertSqlStatementSet`.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -972,20 +987,21 @@ private Operation convertShowViews(SqlShowViews sqlShowViews) {
 
     /** Convert RICH EXPLAIN statement. */
     private Operation convertRichExplain(SqlRichExplain sqlExplain) {
-        Operation operation;
+        List<Operation> operations = new ArrayList<>();
         SqlNode sqlNode = sqlExplain.getStatement();
         Set<String> explainDetails = sqlExplain.getExplainDetails();

Review comment:
       Inline this

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
##########
@@ -167,6 +183,29 @@ class TableEnvironmentTest {
   }
 
   @Test
+  def testExecuteStatementSetExecutionExplain(): Unit = {
+    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+    execEnv.setParallelism(1)
+    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
+    val tEnv = StreamTableEnvironment.create(execEnv, settings)
+
+    TestTableSourceSinks.createPersonCsvTemporaryTable(tEnv, "MyTable")
+
+    TestTableSourceSinks.createCsvTemporarySinkTable(
+      tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink", -1)
+
+    val expected =
+      TableTestUtil.readFromResource("/explain/testStatementSetExecutionExplain.out")
+
+    val actual = tEnv.explainSql(
+      "execute statement set begin insert into MySink select last from MyTable; end",

Review comment:
       Can you add one test that shows the output of the explain with more than one insert?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
##########
@@ -126,17 +125,4 @@ public void validateColumnListParams(
         // this makes it possible to ignore them in the validator and fall back to regular row types
         // see also SqlFunction#deriveType
     }
-

Review comment:
       I see the issue. I wonder if the proper nested validation of explain and statement set can happen here as well, rather than on the planner side, as this seems the place where the validation belongs to. But I might be mistaken here.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -735,6 +735,14 @@ public Table sqlQuery(String query) {
     public TableResult executeSql(String statement) {
         List<Operation> operations = getParser().parse(statement);
 
+        if (operations.size() > 1
+                && operations.stream().allMatch(op -> op instanceof ModifyOperation)) {
+            // allow multi modification operation
+            return executeInternal(
+                    operations.stream()
+                            .map(op -> (ModifyOperation) op)
+                            .collect(Collectors.toList()));
+        }
         if (operations.size() != 1) {
             throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);

Review comment:
       The message of this exception doesn't make sense if `operation.size() > 1` but not all operations are `ModifyOperation`. Can you improve the error message in such case? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org