You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/12 06:51:36 UTC

[GitHub] [flink] fsk119 commented on a diff in pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

fsk119 commented on code in PR #21187:
URL: https://github.com/apache/flink/pull/21187#discussion_r1045350575


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -102,6 +109,8 @@ private SessionContext(
         this.userClassloader = classLoader;
         this.sessionState = sessionState;
         this.operationManager = operationManager;
+        isStatementSetState = false;
+        statementSetOperations = new ArrayList<>();

Review Comment:
   nit: It's better to align with style above.
   
   ```
   this.isStatementSetState = false;
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -128,6 +137,26 @@ public SessionState getSessionState() {
         return sessionState;
     }
 
+    public boolean isStatementSetState() {
+        return isStatementSetState;
+    }
+
+    public void setStatementSetState(boolean isStatementSetState) {

Review Comment:
   How about introducing 
   
   ```
   enableStatementSetMode()
   disableStatementSetMode()
   ```
   
   



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -144,19 +145,29 @@ public ResultFetcher executeStatement(OperationHandle handle, String statement)
                             + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
         }
         Operation op = parsedOperations.get(0);
+        validate(op);
+
         if (op instanceof SetOperation) {
             return callSetOperation(tableEnv, handle, (SetOperation) op);
         } else if (op instanceof ResetOperation) {
             return callResetOperation(handle, (ResetOperation) op);
         } else if (op instanceof BeginStatementSetOperation) {
-            // TODO: support statement set in the FLINK-27837
-            throw new UnsupportedOperationException();
+            return callBeginStatementSetOperation(handle);
         } else if (op instanceof EndStatementSetOperation) {
-            // TODO: support statement set in the FLINK-27837
-            throw new UnsupportedOperationException();
+            return callEndStatementSetOperation(tableEnv, handle);
         } else if (op instanceof ModifyOperation) {
-            return callModifyOperations(
-                    tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+            if (sessionContext.isStatementSetState()) {
+                // collect ModifyOperation to Statement Set
+                sessionContext.addStatementSetOperation((ModifyOperation) op);
+                return new ResultFetcher(
+                        handle,
+                        TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                        CollectionUtil.iteratorToList(
+                                TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+            } else {
+                return callModifyOperations(
+                        tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+            }

Review Comment:
   Can we introducing another StatementSetExecutor to do the statement execution? I think the StatementSet has its own logic and it's much cleaner we can divide into two parts.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -330,6 +341,34 @@ private ResultFetcher callResetOperation(OperationHandle handle, ResetOperation
                         TableResultInternal.TABLE_RESULT_OK.collectInternal()));
     }
 
+    private ResultFetcher callBeginStatementSetOperation(OperationHandle handle) {
+        sessionContext.setStatementSetState(true);
+        return new ResultFetcher(
+                handle,
+                TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                CollectionUtil.iteratorToList(
+                        TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+    }
+
+    private ResultFetcher callEndStatementSetOperation(
+            TableEnvironmentInternal tableEnv, OperationHandle handle) {
+        // reset the state regardless of whether error occurs while executing the set
+        sessionContext.setStatementSetState(false);

Review Comment:
   ```
    sessionContext.setStatementSetState(false);
    sessionContext.clearStatementSetOperations();
   ```
   
   is equal to `sessionContext.disableStatementSetMode()`
   



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -128,6 +137,26 @@ public SessionState getSessionState() {
         return sessionState;
     }
 
+    public boolean isStatementSetState() {
+        return isStatementSetState;
+    }
+
+    public void setStatementSetState(boolean isStatementSetState) {
+        this.isStatementSetState = isStatementSetState;
+    }
+
+    public List<ModifyOperation> getStatementSetOperations() {
+        return Collections.unmodifiableList(new ArrayList<>(statementSetOperations));

Review Comment:
   I think 
   ```
           return Collections.unmodifiableList(statementSetOperations);
   ```
   
   is enough.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java:
##########
@@ -133,6 +139,70 @@ public void testSetAndResetArbitraryKey() {
         assertThat(sessionContext.getConfigMap().get("bb")).isNull();
     }
 
+    @Test
+    public void testStatementSetStateTransition() {
+        assertThat(sessionContext.isStatementSetState()).isFalse();
+
+        OperationExecutor executor = sessionContext.createOperationExecutor(new Configuration());
+        TableEnvironmentInternal tableEnv = executor.getTableEnvironment();
+        tableEnv.executeSql("CREATE TABLE whatever (f STRING) WITH ('connector' = 'values')");
+
+        executor.executeStatement(OperationHandle.create(), "BEGIN STATEMENT SET;");
+
+        assertThat(sessionContext.isStatementSetState()).isTrue();
+
+        // invalid statement in Statement Set
+        assertThatThrownBy(() -> executor.executeStatement(OperationHandle.create(), "SELECT 1;"))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                SqlExecutionException.class,
+                                "Wrong statement after 'BEGIN STATEMENT SET'.\n"
+                                        + "Only 'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' statement to terminate Statement Set."));
+
+        // 'isStatementSetState' is still true, and nothing in 'statementSetOperations'
+        assertThat(sessionContext.isStatementSetState()).isTrue();
+        assertThat(sessionContext.getStatementSetOperations().size()).isEqualTo(0);
+
+        // valid statement in Statement Set
+        String insert = "INSERT INTO whatever VALUES('test%s');";
+        int repeat = 3;
+        for (int i = 0; i < repeat; i++) {
+            executor.executeStatement(OperationHandle.create(), String.format(insert, i));
+        }
+
+        assertThat(sessionContext.getStatementSetOperations().size()).isEqualTo(repeat);
+        for (int i = 0; i < repeat; i++) {
+            assertThat(sessionContext.getStatementSetOperations().get(i).asSummaryString())
+                    .isEqualTo(
+                            tableEnv.getParser()
+                                    .parse(String.format(insert, i))
+                                    .get(0)
+                                    .asSummaryString());
+        }
+
+        // end Statement Set
+        try {
+            executor.executeStatement(OperationHandle.create(), "END;");
+        } catch (Throwable t) {
+            // just test the Statement Set state transition, so ignore the error that cluster
+            // doesn't exist
+        }

Review Comment:
   If so, I think you can move this part into SqlGatewayServiceITCase



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -405,4 +444,22 @@ private Set<TableInfo> listViews(String catalogName, String databaseName) {
                                                 TableKind.VIEW))
                         .collect(Collectors.toSet()));
     }
+
+    private void validate(Operation operation) {
+        if (sessionContext.isStatementSetState()) {
+            if (!(operation instanceof SinkModifyOperation
+                    || operation instanceof EndStatementSetOperation
+                    || operation instanceof CreateTableASOperation)) {
+                throw new SqlExecutionException(
+                        "Wrong statement after 'BEGIN STATEMENT SET'.\n"
+                                + "Only 'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' statement to terminate Statement Set.");

Review Comment:
   I think 
   
   ```
   Only 'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' statement to submit Statement Set.
   ```
   
   is enough.



##########
flink-table/flink-sql-gateway/src/test/resources/sql/begin_statement_set.q:
##########
@@ -0,0 +1,341 @@
+# statement-set.q - BEGIN STATEMENT SET, END

Review Comment:
   begin-statement-set.q ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org