You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/28 09:38:55 UTC

[GitHub] [flink] yuzelin opened a new pull request, #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

yuzelin opened a new pull request, #21187:
URL: https://github.com/apache/flink/pull/21187

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   To support the 'BEGIN STATEMENT SET' syntax in the SQL Gateway.
   
   
   
   ## Brief change log
     - Implement `callBeginStatementSetOperation` and `callEndStatementSetOperation` in `OperationExecutor`.
     - Add tests in `SessionContextTest` and `SqlGatewayServiceITCase`.
   
   
   ## Verifying this change
   This change added tests and can be verified as follows:
     - New test method `SessionContextTest#testStatementSetStateTransition`.
     - New 'begin_statement_set.q' test file in resources/sql that can be verified by `SqlGatewayServiceITCase`.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **yes** 
     - If yes, how is the feature documented? **JavaDocs** 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #21187:
URL: https://github.com/apache/flink/pull/21187#issuecomment-1360718894

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 closed pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
fsk119 closed pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway
URL: https://github.com/apache/flink/pull/21187


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] yuzelin commented on a diff in pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
yuzelin commented on code in PR #21187:
URL: https://github.com/apache/flink/pull/21187#discussion_r1022505487


##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java:
##########
@@ -133,6 +138,79 @@ public void testSetAndResetArbitraryKey() {
         assertThat(sessionContext.getConfigMap().get("bb")).isNull();
     }
 
+    @Test
+    public void testStatementSetStateTransition() {
+        assertThat(sessionContext.isStatementSetState()).isFalse();
+
+        Configuration emptyConfiguration = new Configuration();
+        TableEnvironmentInternal tableEnv =
+                sessionContext.createOperationExecutor(emptyConfiguration).getTableEnvironment();
+        tableEnv.executeSql("CREATE TABLE whatever (f STRING) WITH ('connector' = 'values')");
+
+        sessionContext
+                .createOperationExecutor(emptyConfiguration)
+                .executeStatement(OperationHandle.create(), "BEGIN STATEMENT SET;");
+
+        assertThat(sessionContext.isStatementSetState()).isTrue();
+
+        // invalid statement in Statement Set
+        assertThatThrownBy(
+                        () ->
+                                sessionContext
+                                        .createOperationExecutor(emptyConfiguration)
+                                        .executeStatement(OperationHandle.create(), "SELECT 1;"))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                SqlExecutionException.class,
+                                "Wrong statement after 'BEGIN STATEMENT SET'.\n"
+                                        + "Only 'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' statement to terminate Statement Set."));
+
+        // 'isStatementSetState' is still true, and nothing in 'statementSetOperations'
+        assertThat(sessionContext.isStatementSetState()).isTrue();
+        assertThat(sessionContext.getStatementSetOperations().size()).isEqualTo(0);
+
+        // valid statement in Statement Set
+        sessionContext
+                .createOperationExecutor(emptyConfiguration)
+                .executeStatement(
+                        OperationHandle.create(), "INSERT INTO whatever VALUES('Hello World');");

Review Comment:
   Sure it is worth. I forgot to test this case. The test is improved now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] yuzelin commented on pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
yuzelin commented on PR #21187:
URL: https://github.com/apache/flink/pull/21187#issuecomment-1337455964

   > What branch can support ?
   @WuQic Hi, sorry for late response. We supposed to support it in release 1.17 .
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] yuzelin commented on pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
yuzelin commented on PR #21187:
URL: https://github.com/apache/flink/pull/21187#issuecomment-1308294522

   cc @fsk119 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] yuzelin commented on a diff in pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
yuzelin commented on code in PR #21187:
URL: https://github.com/apache/flink/pull/21187#discussion_r1046600050


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -128,6 +137,26 @@ public SessionState getSessionState() {
         return sessionState;
     }
 
+    public boolean isStatementSetState() {
+        return isStatementSetState;
+    }
+
+    public void setStatementSetState(boolean isStatementSetState) {

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21187:
URL: https://github.com/apache/flink/pull/21187#issuecomment-1294782997

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4d94f14b9d8227609838d9803cc7d5560a1ad582",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4d94f14b9d8227609838d9803cc7d5560a1ad582",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4d94f14b9d8227609838d9803cc7d5560a1ad582 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] yuzelin commented on a diff in pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
yuzelin commented on code in PR #21187:
URL: https://github.com/apache/flink/pull/21187#discussion_r1046600712


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -405,4 +444,22 @@ private Set<TableInfo> listViews(String catalogName, String databaseName) {
                                                 TableKind.VIEW))
                         .collect(Collectors.toSet()));
     }
+
+    private void validate(Operation operation) {
+        if (sessionContext.isStatementSetState()) {
+            if (!(operation instanceof SinkModifyOperation
+                    || operation instanceof EndStatementSetOperation
+                    || operation instanceof CreateTableASOperation)) {
+                throw new SqlExecutionException(
+                        "Wrong statement after 'BEGIN STATEMENT SET'.\n"
+                                + "Only 'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' statement to terminate Statement Set.");

Review Comment:
   OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] yuzelin commented on a diff in pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
yuzelin commented on code in PR #21187:
URL: https://github.com/apache/flink/pull/21187#discussion_r1046601074


##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java:
##########
@@ -133,6 +139,70 @@ public void testSetAndResetArbitraryKey() {
         assertThat(sessionContext.getConfigMap().get("bb")).isNull();
     }
 
+    @Test
+    public void testStatementSetStateTransition() {
+        assertThat(sessionContext.isStatementSetState()).isFalse();
+
+        OperationExecutor executor = sessionContext.createOperationExecutor(new Configuration());
+        TableEnvironmentInternal tableEnv = executor.getTableEnvironment();
+        tableEnv.executeSql("CREATE TABLE whatever (f STRING) WITH ('connector' = 'values')");
+
+        executor.executeStatement(OperationHandle.create(), "BEGIN STATEMENT SET;");
+
+        assertThat(sessionContext.isStatementSetState()).isTrue();
+
+        // invalid statement in Statement Set
+        assertThatThrownBy(() -> executor.executeStatement(OperationHandle.create(), "SELECT 1;"))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                SqlExecutionException.class,
+                                "Wrong statement after 'BEGIN STATEMENT SET'.\n"
+                                        + "Only 'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' statement to terminate Statement Set."));
+
+        // 'isStatementSetState' is still true, and nothing in 'statementSetOperations'
+        assertThat(sessionContext.isStatementSetState()).isTrue();
+        assertThat(sessionContext.getStatementSetOperations().size()).isEqualTo(0);
+
+        // valid statement in Statement Set
+        String insert = "INSERT INTO whatever VALUES('test%s');";
+        int repeat = 3;
+        for (int i = 0; i < repeat; i++) {
+            executor.executeStatement(OperationHandle.create(), String.format(insert, i));
+        }
+
+        assertThat(sessionContext.getStatementSetOperations().size()).isEqualTo(repeat);
+        for (int i = 0; i < repeat; i++) {
+            assertThat(sessionContext.getStatementSetOperations().get(i).asSummaryString())
+                    .isEqualTo(
+                            tableEnv.getParser()
+                                    .parse(String.format(insert, i))
+                                    .get(0)
+                                    .asSummaryString());
+        }
+
+        // end Statement Set
+        try {
+            executor.executeStatement(OperationHandle.create(), "END;");
+        } catch (Throwable t) {
+            // just test the Statement Set state transition, so ignore the error that cluster
+            // doesn't exist
+        }

Review Comment:
   Now I think the SqlGatewayServiceStatementITCase is enough, so I remove the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] yuzelin commented on a diff in pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
yuzelin commented on code in PR #21187:
URL: https://github.com/apache/flink/pull/21187#discussion_r1046600266


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -144,19 +145,29 @@ public ResultFetcher executeStatement(OperationHandle handle, String statement)
                             + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
         }
         Operation op = parsedOperations.get(0);
+        validate(op);
+
         if (op instanceof SetOperation) {
             return callSetOperation(tableEnv, handle, (SetOperation) op);
         } else if (op instanceof ResetOperation) {
             return callResetOperation(handle, (ResetOperation) op);
         } else if (op instanceof BeginStatementSetOperation) {
-            // TODO: support statement set in the FLINK-27837
-            throw new UnsupportedOperationException();
+            return callBeginStatementSetOperation(handle);
         } else if (op instanceof EndStatementSetOperation) {
-            // TODO: support statement set in the FLINK-27837
-            throw new UnsupportedOperationException();
+            return callEndStatementSetOperation(tableEnv, handle);
         } else if (op instanceof ModifyOperation) {
-            return callModifyOperations(
-                    tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+            if (sessionContext.isStatementSetState()) {
+                // collect ModifyOperation to Statement Set
+                sessionContext.addStatementSetOperation((ModifyOperation) op);
+                return new ResultFetcher(
+                        handle,
+                        TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                        CollectionUtil.iteratorToList(
+                                TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+            } else {
+                return callModifyOperations(
+                        tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+            }

Review Comment:
   Improved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] yuzelin commented on a diff in pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
yuzelin commented on code in PR #21187:
URL: https://github.com/apache/flink/pull/21187#discussion_r1046597242


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -128,6 +137,26 @@ public SessionState getSessionState() {
         return sessionState;
     }
 
+    public boolean isStatementSetState() {
+        return isStatementSetState;
+    }
+
+    public void setStatementSetState(boolean isStatementSetState) {
+        this.isStatementSetState = isStatementSetState;
+    }
+
+    public List<ModifyOperation> getStatementSetOperations() {
+        return Collections.unmodifiableList(new ArrayList<>(statementSetOperations));

Review Comment:
   This is because that disableStatementSetMode() will clear the statementSetOperations, so here should return a new list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #21187:
URL: https://github.com/apache/flink/pull/21187#discussion_r1045350575


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -102,6 +109,8 @@ private SessionContext(
         this.userClassloader = classLoader;
         this.sessionState = sessionState;
         this.operationManager = operationManager;
+        isStatementSetState = false;
+        statementSetOperations = new ArrayList<>();

Review Comment:
   nit: It's better to align with style above.
   
   ```
   this.isStatementSetState = false;
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -128,6 +137,26 @@ public SessionState getSessionState() {
         return sessionState;
     }
 
+    public boolean isStatementSetState() {
+        return isStatementSetState;
+    }
+
+    public void setStatementSetState(boolean isStatementSetState) {

Review Comment:
   How about introducing 
   
   ```
   enableStatementSetMode()
   disableStatementSetMode()
   ```
   
   



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -144,19 +145,29 @@ public ResultFetcher executeStatement(OperationHandle handle, String statement)
                             + "multiple 'INSERT INTO' statements wrapped in a 'STATEMENT SET' block.");
         }
         Operation op = parsedOperations.get(0);
+        validate(op);
+
         if (op instanceof SetOperation) {
             return callSetOperation(tableEnv, handle, (SetOperation) op);
         } else if (op instanceof ResetOperation) {
             return callResetOperation(handle, (ResetOperation) op);
         } else if (op instanceof BeginStatementSetOperation) {
-            // TODO: support statement set in the FLINK-27837
-            throw new UnsupportedOperationException();
+            return callBeginStatementSetOperation(handle);
         } else if (op instanceof EndStatementSetOperation) {
-            // TODO: support statement set in the FLINK-27837
-            throw new UnsupportedOperationException();
+            return callEndStatementSetOperation(tableEnv, handle);
         } else if (op instanceof ModifyOperation) {
-            return callModifyOperations(
-                    tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+            if (sessionContext.isStatementSetState()) {
+                // collect ModifyOperation to Statement Set
+                sessionContext.addStatementSetOperation((ModifyOperation) op);
+                return new ResultFetcher(
+                        handle,
+                        TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                        CollectionUtil.iteratorToList(
+                                TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+            } else {
+                return callModifyOperations(
+                        tableEnv, handle, Collections.singletonList((ModifyOperation) op));
+            }

Review Comment:
   Can we introducing another StatementSetExecutor to do the statement execution? I think the StatementSet has its own logic and it's much cleaner we can divide into two parts.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -330,6 +341,34 @@ private ResultFetcher callResetOperation(OperationHandle handle, ResetOperation
                         TableResultInternal.TABLE_RESULT_OK.collectInternal()));
     }
 
+    private ResultFetcher callBeginStatementSetOperation(OperationHandle handle) {
+        sessionContext.setStatementSetState(true);
+        return new ResultFetcher(
+                handle,
+                TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                CollectionUtil.iteratorToList(
+                        TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+    }
+
+    private ResultFetcher callEndStatementSetOperation(
+            TableEnvironmentInternal tableEnv, OperationHandle handle) {
+        // reset the state regardless of whether error occurs while executing the set
+        sessionContext.setStatementSetState(false);

Review Comment:
   ```
    sessionContext.setStatementSetState(false);
    sessionContext.clearStatementSetOperations();
   ```
   
   is equal to `sessionContext.disableStatementSetMode()`
   



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java:
##########
@@ -128,6 +137,26 @@ public SessionState getSessionState() {
         return sessionState;
     }
 
+    public boolean isStatementSetState() {
+        return isStatementSetState;
+    }
+
+    public void setStatementSetState(boolean isStatementSetState) {
+        this.isStatementSetState = isStatementSetState;
+    }
+
+    public List<ModifyOperation> getStatementSetOperations() {
+        return Collections.unmodifiableList(new ArrayList<>(statementSetOperations));

Review Comment:
   I think 
   ```
           return Collections.unmodifiableList(statementSetOperations);
   ```
   
   is enough.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java:
##########
@@ -133,6 +139,70 @@ public void testSetAndResetArbitraryKey() {
         assertThat(sessionContext.getConfigMap().get("bb")).isNull();
     }
 
+    @Test
+    public void testStatementSetStateTransition() {
+        assertThat(sessionContext.isStatementSetState()).isFalse();
+
+        OperationExecutor executor = sessionContext.createOperationExecutor(new Configuration());
+        TableEnvironmentInternal tableEnv = executor.getTableEnvironment();
+        tableEnv.executeSql("CREATE TABLE whatever (f STRING) WITH ('connector' = 'values')");
+
+        executor.executeStatement(OperationHandle.create(), "BEGIN STATEMENT SET;");
+
+        assertThat(sessionContext.isStatementSetState()).isTrue();
+
+        // invalid statement in Statement Set
+        assertThatThrownBy(() -> executor.executeStatement(OperationHandle.create(), "SELECT 1;"))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                SqlExecutionException.class,
+                                "Wrong statement after 'BEGIN STATEMENT SET'.\n"
+                                        + "Only 'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' statement to terminate Statement Set."));
+
+        // 'isStatementSetState' is still true, and nothing in 'statementSetOperations'
+        assertThat(sessionContext.isStatementSetState()).isTrue();
+        assertThat(sessionContext.getStatementSetOperations().size()).isEqualTo(0);
+
+        // valid statement in Statement Set
+        String insert = "INSERT INTO whatever VALUES('test%s');";
+        int repeat = 3;
+        for (int i = 0; i < repeat; i++) {
+            executor.executeStatement(OperationHandle.create(), String.format(insert, i));
+        }
+
+        assertThat(sessionContext.getStatementSetOperations().size()).isEqualTo(repeat);
+        for (int i = 0; i < repeat; i++) {
+            assertThat(sessionContext.getStatementSetOperations().get(i).asSummaryString())
+                    .isEqualTo(
+                            tableEnv.getParser()
+                                    .parse(String.format(insert, i))
+                                    .get(0)
+                                    .asSummaryString());
+        }
+
+        // end Statement Set
+        try {
+            executor.executeStatement(OperationHandle.create(), "END;");
+        } catch (Throwable t) {
+            // just test the Statement Set state transition, so ignore the error that cluster
+            // doesn't exist
+        }

Review Comment:
   If so, I think you can move this part into SqlGatewayServiceITCase



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -405,4 +444,22 @@ private Set<TableInfo> listViews(String catalogName, String databaseName) {
                                                 TableKind.VIEW))
                         .collect(Collectors.toSet()));
     }
+
+    private void validate(Operation operation) {
+        if (sessionContext.isStatementSetState()) {
+            if (!(operation instanceof SinkModifyOperation
+                    || operation instanceof EndStatementSetOperation
+                    || operation instanceof CreateTableASOperation)) {
+                throw new SqlExecutionException(
+                        "Wrong statement after 'BEGIN STATEMENT SET'.\n"
+                                + "Only 'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' statement to terminate Statement Set.");

Review Comment:
   I think 
   
   ```
   Only 'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' statement to submit Statement Set.
   ```
   
   is enough.



##########
flink-table/flink-sql-gateway/src/test/resources/sql/begin_statement_set.q:
##########
@@ -0,0 +1,341 @@
+# statement-set.q - BEGIN STATEMENT SET, END

Review Comment:
   begin-statement-set.q ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] WuQic commented on pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
WuQic commented on PR #21187:
URL: https://github.com/apache/flink/pull/21187#issuecomment-1327185649

   What branches can support


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] jnh5y commented on a diff in pull request #21187: [FLINK-27837][sql-gateway] Support statement set in the SQL Gateway

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #21187:
URL: https://github.com/apache/flink/pull/21187#discussion_r1021916806


##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java:
##########
@@ -133,6 +138,79 @@ public void testSetAndResetArbitraryKey() {
         assertThat(sessionContext.getConfigMap().get("bb")).isNull();
     }
 
+    @Test
+    public void testStatementSetStateTransition() {
+        assertThat(sessionContext.isStatementSetState()).isFalse();
+
+        Configuration emptyConfiguration = new Configuration();
+        TableEnvironmentInternal tableEnv =
+                sessionContext.createOperationExecutor(emptyConfiguration).getTableEnvironment();
+        tableEnv.executeSql("CREATE TABLE whatever (f STRING) WITH ('connector' = 'values')");
+
+        sessionContext
+                .createOperationExecutor(emptyConfiguration)
+                .executeStatement(OperationHandle.create(), "BEGIN STATEMENT SET;");
+
+        assertThat(sessionContext.isStatementSetState()).isTrue();
+
+        // invalid statement in Statement Set
+        assertThatThrownBy(
+                        () ->
+                                sessionContext
+                                        .createOperationExecutor(emptyConfiguration)
+                                        .executeStatement(OperationHandle.create(), "SELECT 1;"))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                SqlExecutionException.class,
+                                "Wrong statement after 'BEGIN STATEMENT SET'.\n"
+                                        + "Only 'INSERT/CREATE TABLE AS' statement is allowed in Statement Set or use 'END' statement to terminate Statement Set."));
+
+        // 'isStatementSetState' is still true, and nothing in 'statementSetOperations'
+        assertThat(sessionContext.isStatementSetState()).isTrue();
+        assertThat(sessionContext.getStatementSetOperations().size()).isEqualTo(0);
+
+        // valid statement in Statement Set
+        sessionContext
+                .createOperationExecutor(emptyConfiguration)
+                .executeStatement(
+                        OperationHandle.create(), "INSERT INTO whatever VALUES('Hello World');");

Review Comment:
   Is it worth testing multiple statements in the statement set?  Maybe the integration test covers that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org