You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/12/18 03:53:09 UTC

[flink] branch master updated: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService (#21292)

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 934edd37dee [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService (#21292)
934edd37dee is described below

commit 934edd37dee44fc7c8708f09d7b715cd5e8b3404
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Sun Dec 18 11:53:00 2022 +0800

    [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService (#21292)
    
    This closes #21292
---
 .../service/operation/OperationExecutor.java       | 132 +++++++++++++++++++++
 .../table/gateway/service/utils/Constants.java     |   2 +
 .../gateway/service/SqlGatewayServiceITCase.java   | 104 +++++++++++++---
 .../java/org/apache/flink/test/util/TestUtils.java |  43 +++++++
 4 files changed, 263 insertions(+), 18 deletions(-)

diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 5670abfb8be..1d7be8fdf2c 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -19,7 +19,16 @@
 package org.apache.flink.table.gateway.service.operation;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.ClientOptions;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.table.api.CatalogNotExistException;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
@@ -55,25 +64,32 @@ import org.apache.flink.table.operations.UseOperation;
 import org.apache.flink.table.operations.command.AddJarOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
 import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.command.StopJobOperation;
 import org.apache.flink.table.operations.ddl.AlterOperation;
 import org.apache.flink.table.operations.ddl.CreateOperation;
 import org.apache.flink.table.operations.ddl.DropOperation;
 import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.gateway.service.utils.Constants.COMPLETION_HINTS;
 import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
+import static org.apache.flink.table.gateway.service.utils.Constants.SAVEPOINT_PATH;
 import static org.apache.flink.table.gateway.service.utils.Constants.SET_KEY;
 import static org.apache.flink.table.gateway.service.utils.Constants.SET_VALUE;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -86,10 +102,13 @@ public class OperationExecutor {
     private final SessionContext sessionContext;
     private final Configuration executionConfig;
 
+    private final ClusterClientServiceLoader clusterClientServiceLoader;
+
     @VisibleForTesting
     public OperationExecutor(SessionContext context, Configuration executionConfig) {
         this.sessionContext = context;
         this.executionConfig = executionConfig;
+        this.clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
     }
 
     public ResultFetcher configureSession(OperationHandle handle, String statement) {
@@ -163,6 +182,8 @@ public class OperationExecutor {
         } else if (op instanceof QueryOperation) {
             TableResultInternal result = tableEnv.executeInternal(op);
             return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
+        } else if (op instanceof StopJobOperation) {
+            return callStopJobOperation(handle, (StopJobOperation) op);
         } else {
             return callOperation(tableEnv, handle, op);
         }
@@ -405,4 +426,115 @@ public class OperationExecutor {
                                                 TableKind.VIEW))
                         .collect(Collectors.toSet()));
     }
+
+    public ResultFetcher callStopJobOperation(
+            OperationHandle operationHandle, StopJobOperation stopJobOperation)
+            throws SqlExecutionException {
+        String jobId = stopJobOperation.getJobId();
+        boolean isWithSavepoint = stopJobOperation.isWithSavepoint();
+        boolean isWithDrain = stopJobOperation.isWithDrain();
+        Duration clientTimeout =
+                Configuration.fromMap(sessionContext.getConfigMap())
+                        .get(ClientOptions.CLIENT_TIMEOUT);
+        Optional<String> savepoint;
+        try {
+            savepoint =
+                    runClusterAction(
+                            operationHandle,
+                            clusterClient -> {
+                                if (isWithSavepoint) {
+                                    // blocking get savepoint path
+                                    try {
+                                        return Optional.of(
+                                                clusterClient
+                                                        .stopWithSavepoint(
+                                                                JobID.fromHexString(jobId),
+                                                                isWithDrain,
+                                                                executionConfig.get(
+                                                                        CheckpointingOptions
+                                                                                .SAVEPOINT_DIRECTORY),
+                                                                SavepointFormatType.DEFAULT)
+                                                        .get(
+                                                                clientTimeout.toMillis(),
+                                                                TimeUnit.MILLISECONDS));
+                                    } catch (Exception e) {
+                                        throw new FlinkException(
+                                                "Could not stop job "
+                                                        + stopJobOperation.getJobId()
+                                                        + " in session "
+                                                        + operationHandle.getIdentifier()
+                                                        + ".",
+                                                e);
+                                    }
+                                } else {
+                                    clusterClient.cancel(JobID.fromHexString(jobId));
+                                    return Optional.empty();
+                                }
+                            });
+        } catch (Exception e) {
+            throw new SqlExecutionException(
+                    "Could not stop job " + jobId + " for operation " + operationHandle + ".", e);
+        }
+        if (isWithSavepoint) {
+            return new ResultFetcher(
+                    operationHandle,
+                    ResolvedSchema.of(Column.physical(SAVEPOINT_PATH, DataTypes.STRING())),
+                    Collections.singletonList(
+                            GenericRowData.of(StringData.fromString(savepoint.orElse("")))));
+        } else {
+            return new ResultFetcher(
+                    operationHandle,
+                    TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+                    CollectionUtil.iteratorToList(
+                            TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+        }
+    }
+
+    /**
+     * Retrieves the {@link ClusterClient} from the session and runs the given {@link ClusterAction}
+     * against it.
+     *
+     * @param operationHandle the specified session handle
+     * @param clusterAction the cluster action to run against the retrieved {@link ClusterClient}.
+     * @param <ClusterID> type of the cluster id
+     * @param <Result>> type of the result
+     * @throws FlinkException if something goes wrong
+     */
+    private <ClusterID, Result> Result runClusterAction(
+            OperationHandle operationHandle, ClusterAction<ClusterID, Result> clusterAction)
+            throws FlinkException {
+        final Configuration configuration = Configuration.fromMap(sessionContext.getConfigMap());
+        final ClusterClientFactory<ClusterID> clusterClientFactory =
+                clusterClientServiceLoader.getClusterClientFactory(configuration);
+
+        final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
+        Preconditions.checkNotNull(
+                clusterId, "No cluster ID found for operation " + operationHandle);
+
+        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
+                        clusterClientFactory.createClusterDescriptor(configuration);
+                final ClusterClient<ClusterID> clusterClient =
+                        clusterDescriptor.retrieve(clusterId).getClusterClient()) {
+            return clusterAction.runAction(clusterClient);
+        }
+    }
+
+    /**
+     * Internal interface to encapsulate cluster actions which are executed via the {@link
+     * ClusterClient}.
+     *
+     * @param <ClusterID> type of the cluster id
+     * @param <Result>> type of the result
+     */
+    @FunctionalInterface
+    private interface ClusterAction<ClusterID, Result> {
+
+        /**
+         * Run the cluster action with the given {@link ClusterClient}.
+         *
+         * @param clusterClient to run the cluster action against
+         * @throws FlinkException if something goes wrong
+         */
+        Result runAction(ClusterClient<ClusterID> clusterClient) throws FlinkException;
+    }
 }
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
index 0b2f96899e2..2f40d74b5b4 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
@@ -25,4 +25,6 @@ public class Constants {
     public static final String SET_KEY = "key";
     public static final String SET_VALUE = "value";
     public static final String COMPLETION_HINTS = "hints";
+
+    public static final String SAVEPOINT_PATH = "savepoint_path";
 }
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 23ccdabade1..d95d7d486c3 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -18,11 +18,16 @@
 
 package org.apache.flink.table.gateway.service;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
@@ -55,7 +60,10 @@ import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
 import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
 import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
 import org.apache.flink.table.planner.utils.TableFunc0;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.UserClassLoaderJarTestUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -64,11 +72,16 @@ import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.assertj.core.api.Condition;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -97,7 +110,6 @@ import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_
 import static org.apache.flink.types.RowKind.DELETE;
 import static org.apache.flink.types.RowKind.INSERT;
 import static org.apache.flink.types.RowKind.UPDATE_AFTER;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 
@@ -105,8 +117,17 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 public class SqlGatewayServiceITCase extends AbstractTestBase {
 
     @RegisterExtension
+    @Order(1)
+    public static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    @RegisterExtension
+    @Order(2)
     public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
-            new SqlGatewayServiceExtension();
+            new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
 
     private static SessionManager sessionManager;
     private static SqlGatewayServiceImpl service;
@@ -291,15 +312,8 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 Duration.ofSeconds(10),
                 "Failed to wait operation finish.");
 
-        Long token = 0L;
         List<RowData> expectedData = getDefaultResultSet().getData();
-        List<RowData> actualData = new ArrayList<>();
-        while (token != null) {
-            ResultSet currentResult =
-                    service.fetchResults(sessionHandle, operationHandle, token, 1);
-            actualData.addAll(checkNotNull(currentResult.getData()));
-            token = currentResult.getNextToken();
-        }
+        List<RowData> actualData = fetchAllResults(sessionHandle, operationHandle);
         assertThat(actualData).isEqualTo(expectedData);
 
         service.closeOperation(sessionHandle, operationHandle);
@@ -379,14 +393,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                         -1,
                         Configuration.fromMap(Collections.singletonMap(key, value)));
 
-        Long token = 0L;
-        List<RowData> settings = new ArrayList<>();
-        while (token != null) {
-            ResultSet result =
-                    service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE);
-            settings.addAll(result.getData());
-            token = result.getNextToken();
-        }
+        List<RowData> settings = fetchAllResults(sessionHandle, operationHandle);
 
         assertThat(settings)
                 .contains(
@@ -394,6 +401,54 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                                 StringData.fromString(key), StringData.fromString(value)));
     }
 
+    @ParameterizedTest
+    @CsvSource({"WITH SAVEPOINT,true", "WITH SAVEPOINT WITH DRAIN,true", "'',false"})
+    public void testStopJobStatementWithSavepoint(
+            String option,
+            boolean hasSavepoint,
+            @InjectClusterClient RestClusterClient<?> restClusterClient,
+            @TempDir File tmpDir)
+            throws Exception {
+        Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
+        configuration.setBoolean(TableConfigOptions.TABLE_DML_SYNC, false);
+        File savepointDir = new File(tmpDir, "savepoints");
+        configuration.set(
+                CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+
+        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+        String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
+        String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
+        String insertSql = "INSERT INTO sink SELECT * FROM source;";
+        String stopSqlTemplate = "STOP JOB '%s' %s;";
+
+        service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
+        service.executeStatement(sessionHandle, sinkDdl, -1, configuration);
+
+        OperationHandle insertOperationHandle =
+                service.executeStatement(sessionHandle, insertSql, -1, configuration);
+
+        List<RowData> results = fetchAllResults(sessionHandle, insertOperationHandle);
+        assertThat(results.size()).isEqualTo(1);
+        String jobId = results.get(0).getString(0).toString();
+
+        TestUtils.waitUntilAllTasksAreRunning(restClusterClient, JobID.fromHexString(jobId));
+
+        String stopSql = String.format(stopSqlTemplate, jobId, option);
+        OperationHandle stopOperationHandle =
+                service.executeStatement(sessionHandle, stopSql, -1, configuration);
+
+        List<RowData> stopResults = fetchAllResults(sessionHandle, stopOperationHandle);
+        assertThat(stopResults.size()).isEqualTo(1);
+        if (hasSavepoint) {
+            String savepoint = stopResults.get(0).getString(0).toString();
+            Path savepointPath = Paths.get(savepoint);
+            assertThat(savepointPath.getFileName().toString()).startsWith("savepoint-");
+        } else {
+            assertThat(stopResults.get(0).getString(0).toString()).isEqualTo("OK");
+        }
+    }
+
     @Test
     public void testGetOperationSchemaUntilOperationIsReady() throws Exception {
         runGetOperationSchemaUntilOperationIsReadyOrError(
@@ -1068,4 +1123,17 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         assertThat(service.completeStatement(sessionHandle, incompleteSql, incompleteSql.length()))
                 .isEqualTo(expectedCompletionHints);
     }
+
+    private List<RowData> fetchAllResults(
+            SessionHandle sessionHandle, OperationHandle operationHandle) {
+        Long token = 0L;
+        List<RowData> results = new ArrayList<>();
+        while (token != null) {
+            ResultSet result =
+                    service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE);
+            results.addAll(result.getData());
+            token = result.getNextToken();
+        }
+        return results;
+    }
 }
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java
index a5b3d158511..4adfbfe9765 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java
@@ -22,13 +22,19 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.util.ExceptionUtils;
@@ -42,6 +48,7 @@ import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.Comparator;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 
@@ -181,4 +188,40 @@ public class TestUtils {
             Thread.sleep(50);
         }
     }
+
+    /**
+     * Wait util all task of a job turns into RUNNING state.
+     *
+     * @param restClusterClient RestClusterClient which could be {@link
+     *     org.apache.flink.test.junit5.InjectClusterClient}.
+     */
+    public static void waitUntilAllTasksAreRunning(
+            RestClusterClient<?> restClusterClient, JobID jobId) throws Exception {
+        // access the REST endpoint of the cluster to determine the state of each ExecutionVertex
+        final JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
+        final JobMessageParameters params = detailsHeaders.getUnresolvedMessageParameters();
+        params.jobPathParameter.resolve(jobId);
+
+        CommonTestUtils.waitUntilCondition(
+                () ->
+                        restClusterClient
+                                .sendRequest(detailsHeaders, params, EmptyRequestBody.getInstance())
+                                .thenApply(
+                                        detailsInfo ->
+                                                allVerticesRunning(
+                                                        detailsInfo.getJobVerticesPerState()))
+                                .get());
+    }
+
+    private static boolean allVerticesRunning(Map<ExecutionState, Integer> states) {
+        return states.entrySet().stream()
+                .allMatch(
+                        entry -> {
+                            if (entry.getKey() == ExecutionState.RUNNING) {
+                                return entry.getValue() > 0;
+                            } else {
+                                return entry.getValue() == 0; // no vertices in non-running state.
+                            }
+                        });
+    }
 }