You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/12/18 03:53:09 UTC
[flink] branch master updated: [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService (#21292)
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 934edd37dee [FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService (#21292)
934edd37dee is described below
commit 934edd37dee44fc7c8708f09d7b715cd5e8b3404
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Sun Dec 18 11:53:00 2022 +0800
[FLINK-28617][SQL Gateway] Support stop job statement in SqlGatewayService (#21292)
This closes #21292
---
.../service/operation/OperationExecutor.java | 132 +++++++++++++++++++++
.../table/gateway/service/utils/Constants.java | 2 +
.../gateway/service/SqlGatewayServiceITCase.java | 104 +++++++++++++---
.../java/org/apache/flink/test/util/TestUtils.java | 43 +++++++
4 files changed, 263 insertions(+), 18 deletions(-)
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 5670abfb8be..1d7be8fdf2c 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -19,7 +19,16 @@
package org.apache.flink.table.gateway.service.operation;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.ClientOptions;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
@@ -55,25 +64,32 @@ import org.apache.flink.table.operations.UseOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.ddl.AlterOperation;
import org.apache.flink.table.operations.ddl.CreateOperation;
import org.apache.flink.table.operations.ddl.DropOperation;
import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.flink.table.gateway.service.utils.Constants.COMPLETION_HINTS;
import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
+import static org.apache.flink.table.gateway.service.utils.Constants.SAVEPOINT_PATH;
import static org.apache.flink.table.gateway.service.utils.Constants.SET_KEY;
import static org.apache.flink.table.gateway.service.utils.Constants.SET_VALUE;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -86,10 +102,13 @@ public class OperationExecutor {
private final SessionContext sessionContext;
private final Configuration executionConfig;
+ private final ClusterClientServiceLoader clusterClientServiceLoader;
+
@VisibleForTesting
public OperationExecutor(SessionContext context, Configuration executionConfig) {
this.sessionContext = context;
this.executionConfig = executionConfig;
+ this.clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
}
public ResultFetcher configureSession(OperationHandle handle, String statement) {
@@ -163,6 +182,8 @@ public class OperationExecutor {
} else if (op instanceof QueryOperation) {
TableResultInternal result = tableEnv.executeInternal(op);
return new ResultFetcher(handle, result.getResolvedSchema(), result.collectInternal());
+ } else if (op instanceof StopJobOperation) {
+ return callStopJobOperation(handle, (StopJobOperation) op);
} else {
return callOperation(tableEnv, handle, op);
}
@@ -405,4 +426,115 @@ public class OperationExecutor {
TableKind.VIEW))
.collect(Collectors.toSet()));
}
+
+ public ResultFetcher callStopJobOperation(
+ OperationHandle operationHandle, StopJobOperation stopJobOperation)
+ throws SqlExecutionException {
+ String jobId = stopJobOperation.getJobId();
+ boolean isWithSavepoint = stopJobOperation.isWithSavepoint();
+ boolean isWithDrain = stopJobOperation.isWithDrain();
+ Duration clientTimeout =
+ Configuration.fromMap(sessionContext.getConfigMap())
+ .get(ClientOptions.CLIENT_TIMEOUT);
+ Optional<String> savepoint;
+ try {
+ savepoint =
+ runClusterAction(
+ operationHandle,
+ clusterClient -> {
+ if (isWithSavepoint) {
+ // blocking get savepoint path
+ try {
+ return Optional.of(
+ clusterClient
+ .stopWithSavepoint(
+ JobID.fromHexString(jobId),
+ isWithDrain,
+ executionConfig.get(
+ CheckpointingOptions
+ .SAVEPOINT_DIRECTORY),
+ SavepointFormatType.DEFAULT)
+ .get(
+ clientTimeout.toMillis(),
+ TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ throw new FlinkException(
+ "Could not stop job "
+ + stopJobOperation.getJobId()
+ + " in session "
+ + operationHandle.getIdentifier()
+ + ".",
+ e);
+ }
+ } else {
+ clusterClient.cancel(JobID.fromHexString(jobId));
+ return Optional.empty();
+ }
+ });
+ } catch (Exception e) {
+ throw new SqlExecutionException(
+ "Could not stop job " + jobId + " for operation " + operationHandle + ".", e);
+ }
+ if (isWithSavepoint) {
+ return new ResultFetcher(
+ operationHandle,
+ ResolvedSchema.of(Column.physical(SAVEPOINT_PATH, DataTypes.STRING())),
+ Collections.singletonList(
+ GenericRowData.of(StringData.fromString(savepoint.orElse("")))));
+ } else {
+ return new ResultFetcher(
+ operationHandle,
+ TableResultInternal.TABLE_RESULT_OK.getResolvedSchema(),
+ CollectionUtil.iteratorToList(
+ TableResultInternal.TABLE_RESULT_OK.collectInternal()));
+ }
+ }
+
+ /**
+ * Retrieves the {@link ClusterClient} from the session and runs the given {@link ClusterAction}
+ * against it.
+ *
+ * @param operationHandle the specified session handle
+ * @param clusterAction the cluster action to run against the retrieved {@link ClusterClient}.
+ * @param <ClusterID> type of the cluster id
+ * @param <Result>> type of the result
+ * @throws FlinkException if something goes wrong
+ */
+ private <ClusterID, Result> Result runClusterAction(
+ OperationHandle operationHandle, ClusterAction<ClusterID, Result> clusterAction)
+ throws FlinkException {
+ final Configuration configuration = Configuration.fromMap(sessionContext.getConfigMap());
+ final ClusterClientFactory<ClusterID> clusterClientFactory =
+ clusterClientServiceLoader.getClusterClientFactory(configuration);
+
+ final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
+ Preconditions.checkNotNull(
+ clusterId, "No cluster ID found for operation " + operationHandle);
+
+ try (final ClusterDescriptor<ClusterID> clusterDescriptor =
+ clusterClientFactory.createClusterDescriptor(configuration);
+ final ClusterClient<ClusterID> clusterClient =
+ clusterDescriptor.retrieve(clusterId).getClusterClient()) {
+ return clusterAction.runAction(clusterClient);
+ }
+ }
+
+ /**
+ * Internal interface to encapsulate cluster actions which are executed via the {@link
+ * ClusterClient}.
+ *
+ * @param <ClusterID> type of the cluster id
+ * @param <Result>> type of the result
+ */
+ @FunctionalInterface
+ private interface ClusterAction<ClusterID, Result> {
+
+ /**
+ * Run the cluster action with the given {@link ClusterClient}.
+ *
+ * @param clusterClient to run the cluster action against
+ * @throws FlinkException if something goes wrong
+ */
+ Result runAction(ClusterClient<ClusterID> clusterClient) throws FlinkException;
+ }
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
index 0b2f96899e2..2f40d74b5b4 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
@@ -25,4 +25,6 @@ public class Constants {
public static final String SET_KEY = "key";
public static final String SET_VALUE = "value";
public static final String COMPLETION_HINTS = "hints";
+
+ public static final String SAVEPOINT_PATH = "savepoint_path";
}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 23ccdabade1..d95d7d486c3 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -18,11 +18,16 @@
package org.apache.flink.table.gateway.service;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
@@ -55,7 +60,10 @@ import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.TableFunc0;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.UserClassLoaderJarTestUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -64,11 +72,16 @@ import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Condition;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -97,7 +110,6 @@ import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_
import static org.apache.flink.types.RowKind.DELETE;
import static org.apache.flink.types.RowKind.INSERT;
import static org.apache.flink.types.RowKind.UPDATE_AFTER;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
@@ -105,8 +117,17 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
public class SqlGatewayServiceITCase extends AbstractTestBase {
@RegisterExtension
+ @Order(1)
+ public static final MiniClusterExtension MINI_CLUSTER =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .build());
+
+ @RegisterExtension
+ @Order(2)
public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
- new SqlGatewayServiceExtension();
+ new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
private static SessionManager sessionManager;
private static SqlGatewayServiceImpl service;
@@ -291,15 +312,8 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
Duration.ofSeconds(10),
"Failed to wait operation finish.");
- Long token = 0L;
List<RowData> expectedData = getDefaultResultSet().getData();
- List<RowData> actualData = new ArrayList<>();
- while (token != null) {
- ResultSet currentResult =
- service.fetchResults(sessionHandle, operationHandle, token, 1);
- actualData.addAll(checkNotNull(currentResult.getData()));
- token = currentResult.getNextToken();
- }
+ List<RowData> actualData = fetchAllResults(sessionHandle, operationHandle);
assertThat(actualData).isEqualTo(expectedData);
service.closeOperation(sessionHandle, operationHandle);
@@ -379,14 +393,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
-1,
Configuration.fromMap(Collections.singletonMap(key, value)));
- Long token = 0L;
- List<RowData> settings = new ArrayList<>();
- while (token != null) {
- ResultSet result =
- service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE);
- settings.addAll(result.getData());
- token = result.getNextToken();
- }
+ List<RowData> settings = fetchAllResults(sessionHandle, operationHandle);
assertThat(settings)
.contains(
@@ -394,6 +401,54 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
StringData.fromString(key), StringData.fromString(value)));
}
+ @ParameterizedTest
+ @CsvSource({"WITH SAVEPOINT,true", "WITH SAVEPOINT WITH DRAIN,true", "'',false"})
+ public void testStopJobStatementWithSavepoint(
+ String option,
+ boolean hasSavepoint,
+ @InjectClusterClient RestClusterClient<?> restClusterClient,
+ @TempDir File tmpDir)
+ throws Exception {
+ Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
+ configuration.setBoolean(TableConfigOptions.TABLE_DML_SYNC, false);
+ File savepointDir = new File(tmpDir, "savepoints");
+ configuration.set(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+
+ SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
+
+ String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
+ String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
+ String insertSql = "INSERT INTO sink SELECT * FROM source;";
+ String stopSqlTemplate = "STOP JOB '%s' %s;";
+
+ service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
+ service.executeStatement(sessionHandle, sinkDdl, -1, configuration);
+
+ OperationHandle insertOperationHandle =
+ service.executeStatement(sessionHandle, insertSql, -1, configuration);
+
+ List<RowData> results = fetchAllResults(sessionHandle, insertOperationHandle);
+ assertThat(results.size()).isEqualTo(1);
+ String jobId = results.get(0).getString(0).toString();
+
+ TestUtils.waitUntilAllTasksAreRunning(restClusterClient, JobID.fromHexString(jobId));
+
+ String stopSql = String.format(stopSqlTemplate, jobId, option);
+ OperationHandle stopOperationHandle =
+ service.executeStatement(sessionHandle, stopSql, -1, configuration);
+
+ List<RowData> stopResults = fetchAllResults(sessionHandle, stopOperationHandle);
+ assertThat(stopResults.size()).isEqualTo(1);
+ if (hasSavepoint) {
+ String savepoint = stopResults.get(0).getString(0).toString();
+ Path savepointPath = Paths.get(savepoint);
+ assertThat(savepointPath.getFileName().toString()).startsWith("savepoint-");
+ } else {
+ assertThat(stopResults.get(0).getString(0).toString()).isEqualTo("OK");
+ }
+ }
+
@Test
public void testGetOperationSchemaUntilOperationIsReady() throws Exception {
runGetOperationSchemaUntilOperationIsReadyOrError(
@@ -1068,4 +1123,17 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
assertThat(service.completeStatement(sessionHandle, incompleteSql, incompleteSql.length()))
.isEqualTo(expectedCompletionHints);
}
+
+ private List<RowData> fetchAllResults(
+ SessionHandle sessionHandle, OperationHandle operationHandle) {
+ Long token = 0L;
+ List<RowData> results = new ArrayList<>();
+ while (token != null) {
+ ResultSet result =
+ service.fetchResults(sessionHandle, operationHandle, token, Integer.MAX_VALUE);
+ results.addAll(result.getData());
+ token = result.getNextToken();
+ }
+ return results;
+ }
}
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java
index a5b3d158511..4adfbfe9765 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java
@@ -22,13 +22,19 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.ExceptionUtils;
@@ -42,6 +48,7 @@ import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Comparator;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -181,4 +188,40 @@ public class TestUtils {
Thread.sleep(50);
}
}
+
+ /**
+ * Wait util all task of a job turns into RUNNING state.
+ *
+ * @param restClusterClient RestClusterClient which could be {@link
+ * org.apache.flink.test.junit5.InjectClusterClient}.
+ */
+ public static void waitUntilAllTasksAreRunning(
+ RestClusterClient<?> restClusterClient, JobID jobId) throws Exception {
+ // access the REST endpoint of the cluster to determine the state of each ExecutionVertex
+ final JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
+ final JobMessageParameters params = detailsHeaders.getUnresolvedMessageParameters();
+ params.jobPathParameter.resolve(jobId);
+
+ CommonTestUtils.waitUntilCondition(
+ () ->
+ restClusterClient
+ .sendRequest(detailsHeaders, params, EmptyRequestBody.getInstance())
+ .thenApply(
+ detailsInfo ->
+ allVerticesRunning(
+ detailsInfo.getJobVerticesPerState()))
+ .get());
+ }
+
+ private static boolean allVerticesRunning(Map<ExecutionState, Integer> states) {
+ return states.entrySet().stream()
+ .allMatch(
+ entry -> {
+ if (entry.getKey() == ExecutionState.RUNNING) {
+ return entry.getValue() > 0;
+ } else {
+ return entry.getValue() == 0; // no vertices in non-running state.
+ }
+ });
+ }
}