You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/11 07:52:21 UTC

[GitHub] [flink] link3280 opened a new pull request, #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

link3280 opened a new pull request, #21292:
URL: https://github.com/apache/flink/pull/21292

   ## What is the purpose of the change
   
   Support stop job statement in SqlGatewayService. Subtask of FLIP-222.
   
   
   ## Brief change log
   
     - *Support stop job statement in SqlGatewayService*
     - *Extend SqlGatewayServiceExtension to include a mini cluster for statement execution*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for stop job statements.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1352758872

   ping @fsk119 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1313035136

   The CI failed due to an unrelated Kafka connector test. We may take a look at the codes first. cc @fsk119 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1354328522

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #21292:
URL: https://github.com/apache/flink/pull/21292#discussion_r1050549461


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java:
##########
@@ -181,4 +186,24 @@ public static void waitUntilJobCanceled(JobID jobId, ClusterClient<?> client)
             Thread.sleep(50);
         }
     }
+
+    /**
+     * Wait util at least one job turns into RUNNING status in the cluster. Applicable for single
+     * job scenarios.
+     *
+     * @param client ClusterClient which could be {@link
+     *     org.apache.flink.test.junit5.InjectClusterClient}.
+     */
+    public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
+        Collection<JobStatusMessage> statusMessages = client.listJobs().get();

Review Comment:
   This should be in the while-loop.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -405,4 +426,115 @@ private Set<TableInfo> listViews(String catalogName, String databaseName) {
                                                 TableKind.VIEW))
                         .collect(Collectors.toSet()));
     }
+
+    public ResultFetcher callStopJobOperation(
+            OperationHandle operationHandle, StopJobOperation stopJobOperation)
+            throws SqlExecutionException {
+        String jobId = stopJobOperation.getJobId();
+        boolean isWithSavepoint = stopJobOperation.isWithSavepoint();
+        boolean isWithDrain = stopJobOperation.isWithDrain();
+        Duration clientTimeout =
+                Configuration.fromMap(sessionContext.getConfigMap())
+                        .get(ClientOptions.CLIENT_TIMEOUT);
+        Optional<String> savepoint;
+        try {
+            savepoint =
+                    runClusterAction(
+                            operationHandle,
+                            clusterClient -> {
+                                if (isWithSavepoint) {
+                                    // blocking get savepoint path
+                                    try {
+                                        return Optional.of(
+                                                clusterClient
+                                                        .stopWithSavepoint(
+                                                                JobID.fromHexString(jobId),
+                                                                isWithDrain,
+                                                                executionConfig.get(
+                                                                        CheckpointingOptions
+                                                                                .SAVEPOINT_DIRECTORY),
+                                                                SavepointFormatType.DEFAULT)
+                                                        .get(
+                                                                clientTimeout.toMillis(),
+                                                                TimeUnit.MILLISECONDS));
+                                    } catch (Exception e) {
+                                        throw new FlinkException(
+                                                "Could not stop job "
+                                                        + stopJobOperation

Review Comment:
   The exception msg is 
   ```
    Could not stop job org.apache.flink.table.operations.command.StopJobOperation@4a951f6b in session aa057ac2-8107-4f14-9fa1-06f637fe2b8e.
   ```
   
   I think we can print the job id  here.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -379,21 +392,62 @@ public void testExecuteSqlWithConfig() {
                         -1,
                         Configuration.fromMap(Collections.singletonMap(key, value)));
 
-        Long token = 0L;
-        List<RowData> settings = new ArrayList<>();
-        while (token != null) {
-            ResultSet result =
-                    service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE);
-            settings.addAll(result.getData());
-            token = result.getNextToken();
-        }
+        List<RowData> settings = fetchAllResults(sessionHandle, operationHandle);
 
         assertThat(settings)
                 .contains(
                         GenericRowData.of(
                                 StringData.fromString(key), StringData.fromString(value)));
     }
 
+    @ParameterizedTest
+    @CsvSource({"WITH SAVEPOINT,true", "WITH SAVEPOINT WITH DRAIN,true", "'',false"})
+    public void testStopJobStatementWithSavepoint(
+            String option,
+            boolean hasSavepoint,
+            @InjectClusterClient RestClusterClient<?> restClusterClient,
+            @TempDir File tmpDir)
+            throws Exception {
+        Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
+        configuration.setBoolean(TableConfigOptions.TABLE_DML_SYNC, false);
+        File savepointDir = new File(tmpDir, "savepoints");
+        configuration.set(
+                CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+        String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
+        String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
+        String insertSql = "INSERT INTO sink SELECT * FROM source;";
+        String stopSqlTemplate = "STOP JOB '%s' %s;";
+
+        service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
+        service.executeStatement(sessionHandle, sinkDdl, -1, configuration);
+
+        OperationHandle insertOperationHandle =
+                service.executeStatement(sessionHandle, insertSql, -1, configuration);
+
+        List<RowData> results = fetchAllResults(sessionHandle, insertOperationHandle);
+        assertThat(results.size()).isEqualTo(1);
+        String jobId = results.get(0).getString(0).toString();
+
+        TestUtils.waitUntilJobIsRunning(restClusterClient);

Review Comment:
   I think we should wait for all tasks are running. We can do as `SavepointITCase` does.
   ```
   public static void waitUntilAllTasksAreRunning(
               RestClusterClient<?> restClusterClient, JobID jobId) throws Exception {
           // access the REST endpoint of the cluster to determine the state of each
           // ExecutionVertex
   
           final JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
           final JobMessageParameters params = detailsHeaders.getUnresolvedMessageParameters();
           params.jobPathParameter.resolve(jobId);
   
           CommonTestUtils.waitUntilCondition(
                   () ->
                           restClusterClient
                                   .sendRequest(detailsHeaders, params, EmptyRequestBody.getInstance())
                                   .thenApply(
                                           detailsInfo ->
                                                   allVerticesRunning(
                                                           detailsInfo.getJobVerticesPerState()))
                                   .get());
       }
   
       private static boolean allVerticesRunning(Map<ExecutionState, Integer> states) {
           return states.entrySet().stream()
                   .allMatch(
                           entry -> {
                               if (entry.getKey() == ExecutionState.RUNNING) {
                                   return entry.getValue() > 0;
                               } else {
                                   return entry.getValue() == 0; // no vertices in non-running state.
                               }
                           });
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1313003298

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1320789012

   Rebased on the recent master to see if the test failure still exists.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1354366573

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on a diff in pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on code in PR #21292:
URL: https://github.com/apache/flink/pull/21292#discussion_r1050321324


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -405,4 +425,107 @@ private Set<TableInfo> listViews(String catalogName, String databaseName) {
                                                 TableKind.VIEW))
                         .collect(Collectors.toSet()));
     }
+
+    public ResultFetcher callStopJobOperation(
+            OperationHandle operationHandle, StopJobOperation stopJobOperation)
+            throws SqlExecutionException {
+        String jobId = stopJobOperation.getJobId();
+        boolean isWithSavepoint = stopJobOperation.isWithSavepoint();
+        boolean isWithDrain = stopJobOperation.isWithDrain();
+        Duration clientTimeout =
+                Configuration.fromMap(sessionContext.getConfigMap())
+                        .get(ClientOptions.CLIENT_TIMEOUT);
+        Optional<String> savepoint;
+        try {
+            savepoint =
+                    runClusterAction(
+                            operationHandle,
+                            clusterClient -> {
+                                if (isWithSavepoint) {
+                                    // blocking get savepoint path
+                                    try {
+                                        return Optional.of(
+                                                clusterClient
+                                                        .stopWithSavepoint(
+                                                                JobID.fromHexString(jobId),
+                                                                isWithDrain,
+                                                                executionConfig.get(
+                                                                        CheckpointingOptions
+                                                                                .SAVEPOINT_DIRECTORY),
+                                                                SavepointFormatType.DEFAULT)
+                                                        .get(
+                                                                clientTimeout.toMillis(),
+                                                                TimeUnit.MILLISECONDS));
+                                    } catch (Exception e) {
+                                        throw new FlinkException(
+                                                "Could not stop job "
+                                                        + stopJobOperation
+                                                        + " in session "
+                                                        + operationHandle.getIdentifier()
+                                                        + ".",
+                                                e);
+                                    }
+                                } else {
+                                    clusterClient.cancel(JobID.fromHexString(jobId));
+                                    return Optional.empty();
+                                }
+                            });
+        } catch (Exception e) {
+            throw new SqlExecutionException(
+                    "Could not stop job " + jobId + " for operation " + operationHandle + ".", e);
+        }
+        return new ResultFetcher(
+                operationHandle,
+                ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())),
+                Collections.singletonList(
+                        GenericRowData.of(StringData.fromString(savepoint.orElse("")))));

Review Comment:
   Oops. It looks like something goes wrong in the rebase process.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1312396064

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #21292:
URL: https://github.com/apache/flink/pull/21292#discussion_r1049561459


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -405,4 +425,107 @@ private Set<TableInfo> listViews(String catalogName, String databaseName) {
                                                 TableKind.VIEW))
                         .collect(Collectors.toSet()));
     }
+
+    public ResultFetcher callStopJobOperation(
+            OperationHandle operationHandle, StopJobOperation stopJobOperation)
+            throws SqlExecutionException {
+        String jobId = stopJobOperation.getJobId();
+        boolean isWithSavepoint = stopJobOperation.isWithSavepoint();
+        boolean isWithDrain = stopJobOperation.isWithDrain();
+        Duration clientTimeout =
+                Configuration.fromMap(sessionContext.getConfigMap())
+                        .get(ClientOptions.CLIENT_TIMEOUT);
+        Optional<String> savepoint;
+        try {
+            savepoint =
+                    runClusterAction(
+                            operationHandle,
+                            clusterClient -> {
+                                if (isWithSavepoint) {
+                                    // blocking get savepoint path
+                                    try {
+                                        return Optional.of(
+                                                clusterClient
+                                                        .stopWithSavepoint(
+                                                                JobID.fromHexString(jobId),
+                                                                isWithDrain,
+                                                                executionConfig.get(
+                                                                        CheckpointingOptions
+                                                                                .SAVEPOINT_DIRECTORY),
+                                                                SavepointFormatType.DEFAULT)
+                                                        .get(
+                                                                clientTimeout.toMillis(),
+                                                                TimeUnit.MILLISECONDS));
+                                    } catch (Exception e) {
+                                        throw new FlinkException(
+                                                "Could not stop job "
+                                                        + stopJobOperation
+                                                        + " in session "
+                                                        + operationHandle.getIdentifier()
+                                                        + ".",
+                                                e);
+                                    }
+                                } else {
+                                    clusterClient.cancel(JobID.fromHexString(jobId));
+                                    return Optional.empty();
+                                }
+                            });
+        } catch (Exception e) {
+            throw new SqlExecutionException(
+                    "Could not stop job " + jobId + " for operation " + operationHandle + ".", e);
+        }
+        return new ResultFetcher(
+                operationHandle,
+                ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())),
+                Collections.singletonList(
+                        GenericRowData.of(StringData.fromString(savepoint.orElse("")))));

Review Comment:
   The returned schema in the FLIP is 
   
   ```
   
   +-----------------------------------------------------------------|
   |                     savepoint_path                              |
   +-----------------------------------------------------------------|
   | hdfs://mycluster/flink-savepoints/savepoint-cca7bc-bb1e257f0dab |
   +-----------------------------------------------------------------|
   ```
   
   I think we'd better don't change this if we don't have any strong reason.
   
   By the way, the returned schema depends on whether the statement has a `WITH SAVEPOINT` clause.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -379,21 +391,59 @@ public void testExecuteSqlWithConfig() {
                         -1,
                         Configuration.fromMap(Collections.singletonMap(key, value)));
 
-        Long token = 0L;
-        List<RowData> settings = new ArrayList<>();
-        while (token != null) {
-            ResultSet result =
-                    service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE);
-            settings.addAll(result.getData());
-            token = result.getNextToken();
-        }
+        List<RowData> settings = fetchAllResultRows(sessionHandle, operationHandle);
 
         assertThat(settings)
                 .contains(
                         GenericRowData.of(
                                 StringData.fromString(key), StringData.fromString(value)));
     }
 
+    @ParameterizedTest
+    @CsvSource({"WITH SAVEPOINT,true", "WITH SAVEPOINT WITH DRAIN,true", "'',false"})
+    public void testStopJobStatementWithSavepoint(String option, boolean hasSavepoint)
+            throws InterruptedException {
+        Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
+        configuration.setBoolean(TableConfigOptions.TABLE_DML_SYNC, false);
+        File savepointDir = new File(tmpDir, "savepoints");
+        configuration.set(
+                CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+        String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
+        String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
+        String insertSql = "INSERT INTO sink SELECT * FROM source;";
+        String stopSqlTemplate = "STOP JOB '%s' %s;";
+
+        service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
+        service.executeStatement(sessionHandle, sinkDdl, -1, configuration);
+
+        OperationHandle insertOperationHandle =
+                service.executeStatement(sessionHandle, insertSql, -1, configuration);
+
+        List<RowData> results = fetchAllResultRows(sessionHandle, insertOperationHandle);
+        assertThat(results.size()).isEqualTo(1);
+        String jobId = results.get(0).getString(0).toString();
+
+        // wait till the job turns into running status
+        Thread.sleep(2_000L);

Review Comment:
   Take a look at `HiveServer2EndpiontITCase#testExecuteStatementInSyncModeWithRuntimeException2`.  You can get a ClusterClient and query the job status directly.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -1068,4 +1118,17 @@ private void validateCompletionHints(
         assertThat(service.completeStatement(sessionHandle, incompleteSql, incompleteSql.length()))
                 .isEqualTo(expectedCompletionHints);
     }
+
+    private List<RowData> fetchAllResultRows(

Review Comment:
   nit:rename to fetchAllResults



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -379,21 +391,59 @@ public void testExecuteSqlWithConfig() {
                         -1,
                         Configuration.fromMap(Collections.singletonMap(key, value)));
 
-        Long token = 0L;
-        List<RowData> settings = new ArrayList<>();
-        while (token != null) {
-            ResultSet result =
-                    service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE);
-            settings.addAll(result.getData());
-            token = result.getNextToken();
-        }
+        List<RowData> settings = fetchAllResultRows(sessionHandle, operationHandle);
 
         assertThat(settings)
                 .contains(
                         GenericRowData.of(
                                 StringData.fromString(key), StringData.fromString(value)));
     }
 
+    @ParameterizedTest
+    @CsvSource({"WITH SAVEPOINT,true", "WITH SAVEPOINT WITH DRAIN,true", "'',false"})
+    public void testStopJobStatementWithSavepoint(String option, boolean hasSavepoint)

Review Comment:
   nit: `testStopJobStatementWithSavepoint(String option, boolean hasSavepoint, @TempDir Path tmpDir)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1313307201

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1313164112

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1356147691

   Updated as suggested. CI turns great. cc @fsk119 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on a diff in pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on code in PR #21292:
URL: https://github.com/apache/flink/pull/21292#discussion_r1051316673


##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java:
##########
@@ -181,4 +186,24 @@ public static void waitUntilJobCanceled(JobID jobId, ClusterClient<?> client)
             Thread.sleep(50);
         }
     }
+
+    /**
+     * Wait util at least one job turns into RUNNING status in the cluster. Applicable for single
+     * job scenarios.
+     *
+     * @param client ClusterClient which could be {@link
+     *     org.apache.flink.test.junit5.InjectClusterClient}.
+     */
+    public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception {
+        Collection<JobStatusMessage> statusMessages = client.listJobs().get();

Review Comment:
   That's an obvious bug. Sorry for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #21292:
URL: https://github.com/apache/flink/pull/21292#discussion_r1051520363


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -405,4 +426,115 @@ private Set<TableInfo> listViews(String catalogName, String databaseName) {
                                                 TableKind.VIEW))
                         .collect(Collectors.toSet()));
     }
+
+    public ResultFetcher callStopJobOperation(
+            OperationHandle operationHandle, StopJobOperation stopJobOperation)
+            throws SqlExecutionException {
+        String jobId = stopJobOperation.getJobId();
+        boolean isWithSavepoint = stopJobOperation.isWithSavepoint();
+        boolean isWithDrain = stopJobOperation.isWithDrain();
+        Duration clientTimeout =
+                Configuration.fromMap(sessionContext.getConfigMap())
+                        .get(ClientOptions.CLIENT_TIMEOUT);
+        Optional<String> savepoint;
+        try {
+            savepoint =
+                    runClusterAction(
+                            operationHandle,
+                            clusterClient -> {
+                                if (isWithSavepoint) {
+                                    // blocking get savepoint path
+                                    try {
+                                        return Optional.of(
+                                                clusterClient
+                                                        .stopWithSavepoint(
+                                                                JobID.fromHexString(jobId),
+                                                                isWithDrain,
+                                                                executionConfig.get(
+                                                                        CheckpointingOptions
+                                                                                .SAVEPOINT_DIRECTORY),
+                                                                SavepointFormatType.DEFAULT)
+                                                        .get(
+                                                                clientTimeout.toMillis(),
+                                                                TimeUnit.MILLISECONDS));
+                                    } catch (Exception e) {
+                                        throw new FlinkException(
+                                                "Could not stop job "
+                                                        + stopJobOperation.getJobId()
+                                                        + " in session "
+                                                        + operationHandle.getIdentifier()
+                                                        + ".",
+                                                e);
+                                    }

Review Comment:
   nit: It's a little trivial to catch the same exception twice with a similar msg. In the next PR, we can remove this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1311356322

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "908c2e69bfbeac8634915e90e76284cc78908c31",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "908c2e69bfbeac8634915e90e76284cc78908c31",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 908c2e69bfbeac8634915e90e76284cc78908c31 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on a diff in pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on code in PR #21292:
URL: https://github.com/apache/flink/pull/21292#discussion_r1051316514


##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -379,21 +392,62 @@ public void testExecuteSqlWithConfig() {
                         -1,
                         Configuration.fromMap(Collections.singletonMap(key, value)));
 
-        Long token = 0L;
-        List<RowData> settings = new ArrayList<>();
-        while (token != null) {
-            ResultSet result =
-                    service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE);
-            settings.addAll(result.getData());
-            token = result.getNextToken();
-        }
+        List<RowData> settings = fetchAllResults(sessionHandle, operationHandle);
 
         assertThat(settings)
                 .contains(
                         GenericRowData.of(
                                 StringData.fromString(key), StringData.fromString(value)));
     }
 
+    @ParameterizedTest
+    @CsvSource({"WITH SAVEPOINT,true", "WITH SAVEPOINT WITH DRAIN,true", "'',false"})
+    public void testStopJobStatementWithSavepoint(
+            String option,
+            boolean hasSavepoint,
+            @InjectClusterClient RestClusterClient<?> restClusterClient,
+            @TempDir File tmpDir)
+            throws Exception {
+        Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
+        configuration.setBoolean(TableConfigOptions.TABLE_DML_SYNC, false);
+        File savepointDir = new File(tmpDir, "savepoints");
+        configuration.set(
+                CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+        String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
+        String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
+        String insertSql = "INSERT INTO sink SELECT * FROM source;";
+        String stopSqlTemplate = "STOP JOB '%s' %s;";
+
+        service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
+        service.executeStatement(sessionHandle, sinkDdl, -1, configuration);
+
+        OperationHandle insertOperationHandle =
+                service.executeStatement(sessionHandle, insertSql, -1, configuration);
+
+        List<RowData> results = fetchAllResults(sessionHandle, insertOperationHandle);
+        assertThat(results.size()).isEqualTo(1);
+        String jobId = results.get(0).getString(0).toString();
+
+        TestUtils.waitUntilJobIsRunning(restClusterClient);

Review Comment:
   You're right. Thanks for pointing it out!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1314918042

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1313103656

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1314826387

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] link3280 commented on pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
link3280 commented on PR #21292:
URL: https://github.com/apache/flink/pull/21292#issuecomment-1316434664

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 merged pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

Posted by GitBox <gi...@apache.org>.
fsk119 merged PR #21292:
URL: https://github.com/apache/flink/pull/21292


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org