You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/15 12:40:52 UTC

[GitHub] [flink] fsk119 commented on a diff in pull request #21292: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService

fsk119 commented on code in PR #21292:
URL: https://github.com/apache/flink/pull/21292#discussion_r1049561459


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -405,4 +425,107 @@ private Set<TableInfo> listViews(String catalogName, String databaseName) {
                                                 TableKind.VIEW))
                         .collect(Collectors.toSet()));
     }
+
+    public ResultFetcher callStopJobOperation(
+            OperationHandle operationHandle, StopJobOperation stopJobOperation)
+            throws SqlExecutionException {
+        String jobId = stopJobOperation.getJobId();
+        boolean isWithSavepoint = stopJobOperation.isWithSavepoint();
+        boolean isWithDrain = stopJobOperation.isWithDrain();
+        Duration clientTimeout =
+                Configuration.fromMap(sessionContext.getConfigMap())
+                        .get(ClientOptions.CLIENT_TIMEOUT);
+        Optional<String> savepoint;
+        try {
+            savepoint =
+                    runClusterAction(
+                            operationHandle,
+                            clusterClient -> {
+                                if (isWithSavepoint) {
+                                    // blocking get savepoint path
+                                    try {
+                                        return Optional.of(
+                                                clusterClient
+                                                        .stopWithSavepoint(
+                                                                JobID.fromHexString(jobId),
+                                                                isWithDrain,
+                                                                executionConfig.get(
+                                                                        CheckpointingOptions
+                                                                                .SAVEPOINT_DIRECTORY),
+                                                                SavepointFormatType.DEFAULT)
+                                                        .get(
+                                                                clientTimeout.toMillis(),
+                                                                TimeUnit.MILLISECONDS));
+                                    } catch (Exception e) {
+                                        throw new FlinkException(
+                                                "Could not stop job "
+                                                        + stopJobOperation
+                                                        + " in session "
+                                                        + operationHandle.getIdentifier()
+                                                        + ".",
+                                                e);
+                                    }
+                                } else {
+                                    clusterClient.cancel(JobID.fromHexString(jobId));
+                                    return Optional.empty();
+                                }
+                            });
+        } catch (Exception e) {
+            throw new SqlExecutionException(
+                    "Could not stop job " + jobId + " for operation " + operationHandle + ".", e);
+        }
+        return new ResultFetcher(
+                operationHandle,
+                ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())),
+                Collections.singletonList(
+                        GenericRowData.of(StringData.fromString(savepoint.orElse("")))));

Review Comment:
   The returned schema in the FLIP is 
   
   ```
   
   +-----------------------------------------------------------------|
   |                     savepoint_path                              |
   +-----------------------------------------------------------------|
   | hdfs://mycluster/flink-savepoints/savepoint-cca7bc-bb1e257f0dab |
   +-----------------------------------------------------------------|
   ```
   
   I think we'd better don't change this if we don't have any strong reason.
   
   By the way, the returned schema depends on whether the statement has a `WITH SAVEPOINT` clause.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -379,21 +391,59 @@ public void testExecuteSqlWithConfig() {
                         -1,
                         Configuration.fromMap(Collections.singletonMap(key, value)));
 
-        Long token = 0L;
-        List<RowData> settings = new ArrayList<>();
-        while (token != null) {
-            ResultSet result =
-                    service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE);
-            settings.addAll(result.getData());
-            token = result.getNextToken();
-        }
+        List<RowData> settings = fetchAllResultRows(sessionHandle, operationHandle);
 
         assertThat(settings)
                 .contains(
                         GenericRowData.of(
                                 StringData.fromString(key), StringData.fromString(value)));
     }
 
+    @ParameterizedTest
+    @CsvSource({"WITH SAVEPOINT,true", "WITH SAVEPOINT WITH DRAIN,true", "'',false"})
+    public void testStopJobStatementWithSavepoint(String option, boolean hasSavepoint)
+            throws InterruptedException {
+        Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
+        configuration.setBoolean(TableConfigOptions.TABLE_DML_SYNC, false);
+        File savepointDir = new File(tmpDir, "savepoints");
+        configuration.set(
+                CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+        String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
+        String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
+        String insertSql = "INSERT INTO sink SELECT * FROM source;";
+        String stopSqlTemplate = "STOP JOB '%s' %s;";
+
+        service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
+        service.executeStatement(sessionHandle, sinkDdl, -1, configuration);
+
+        OperationHandle insertOperationHandle =
+                service.executeStatement(sessionHandle, insertSql, -1, configuration);
+
+        List<RowData> results = fetchAllResultRows(sessionHandle, insertOperationHandle);
+        assertThat(results.size()).isEqualTo(1);
+        String jobId = results.get(0).getString(0).toString();
+
+        // wait till the job turns into running status
+        Thread.sleep(2_000L);

Review Comment:
   Take a look at `HiveServer2EndpiontITCase#testExecuteStatementInSyncModeWithRuntimeException2`.  You can get a ClusterClient and query the job status directly.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -1068,4 +1118,17 @@ private void validateCompletionHints(
         assertThat(service.completeStatement(sessionHandle, incompleteSql, incompleteSql.length()))
                 .isEqualTo(expectedCompletionHints);
     }
+
+    private List<RowData> fetchAllResultRows(

Review Comment:
   nit:rename to fetchAllResults



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -379,21 +391,59 @@ public void testExecuteSqlWithConfig() {
                         -1,
                         Configuration.fromMap(Collections.singletonMap(key, value)));
 
-        Long token = 0L;
-        List<RowData> settings = new ArrayList<>();
-        while (token != null) {
-            ResultSet result =
-                    service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE);
-            settings.addAll(result.getData());
-            token = result.getNextToken();
-        }
+        List<RowData> settings = fetchAllResultRows(sessionHandle, operationHandle);
 
         assertThat(settings)
                 .contains(
                         GenericRowData.of(
                                 StringData.fromString(key), StringData.fromString(value)));
     }
 
+    @ParameterizedTest
+    @CsvSource({"WITH SAVEPOINT,true", "WITH SAVEPOINT WITH DRAIN,true", "'',false"})
+    public void testStopJobStatementWithSavepoint(String option, boolean hasSavepoint)

Review Comment:
   nit: `testStopJobStatementWithSavepoint(String option, boolean hasSavepoint, @TempDir Path tmpDir)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org