You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/28 11:32:38 UTC
[flink-kubernetes-operator] branch release-0.1 updated: [FLINK-26881] Pick up the savepoint directory in the flink config when trigger savepoint
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/release-0.1 by this push:
new cc35836 [FLINK-26881] Pick up the savepoint directory in the flink config when trigger savepoint
cc35836 is described below
commit cc3583632dc31aa48dd8f186c15ea5082e00a173
Author: Yang Wang <da...@alibaba-inc.com>
AuthorDate: Mon Mar 28 19:30:11 2022 +0800
[FLINK-26881] Pick up the savepoint directory in the flink config when trigger savepoint
---
.../kubernetes/operator/service/FlinkService.java | 41 +++++++++++++---
.../kubernetes/operator/TestingClusterClient.java | 42 ++++++++++++++--
.../operator/service/FlinkServiceTest.java | 57 +++++++++++++++++++---
3 files changed, 123 insertions(+), 17 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index e17f185..c4aaf02 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -28,6 +28,7 @@ import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -51,6 +52,9 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMess
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -70,6 +74,7 @@ import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/** Service for submitting and interacting with Flink clusters and jobs. */
public class FlinkService {
@@ -167,20 +172,37 @@ public class FlinkService {
@Nullable JobID jobID, UpgradeMode upgradeMode, Configuration conf) throws Exception {
Optional<String> savepointOpt = Optional.empty();
try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+ final String clusterId = clusterClient.getClusterId();
switch (upgradeMode) {
case STATELESS:
clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
break;
case SAVEPOINT:
- String savepoint =
- clusterClient
- .stopWithSavepoint(jobID, false, null)
- .get(1, TimeUnit.MINUTES);
- savepointOpt = Optional.of(savepoint);
+ final String savepointDirectory =
+ Preconditions.checkNotNull(
+ conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
+ final long timeout =
+ conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
+ .getSeconds();
+ try {
+ String savepoint =
+ clusterClient
+ .stopWithSavepoint(jobID, false, savepointDirectory)
+ .get(timeout, TimeUnit.SECONDS);
+ savepointOpt = Optional.of(savepoint);
+ } catch (TimeoutException exception) {
+ throw new FlinkException(
+ String.format(
+ "Timed out stopping the job %s in Flink cluster %s with savepoint, "
+ + "please configure a larger timeout via '%s'",
+ jobID,
+ clusterId,
+ ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()),
+ exception);
+ }
break;
case LAST_STATE:
final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
- final String clusterId = clusterClient.getClusterId();
FlinkUtils.deleteCluster(namespace, clusterId, kubernetesClient, false);
break;
default:
@@ -207,13 +229,16 @@ public class FlinkService {
savepointTriggerMessageParameters.jobID.resolve(
JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()));
+ final String savepointDirectory =
+ Preconditions.checkNotNull(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
+ final long timeout = operatorConfiguration.getFlinkClientTimeout().getSeconds();
TriggerResponse response =
clusterClient
.sendRequest(
savepointTriggerHeaders,
savepointTriggerMessageParameters,
- new SavepointTriggerRequestBody(null, false))
- .get();
+ new SavepointTriggerRequestBody(savepointDirectory, false))
+ .get(timeout, TimeUnit.SECONDS);
LOG.info("Savepoint successfully triggered: " + response.getTriggerId().toHexString());
org.apache.flink.kubernetes.operator.crd.status.SavepointInfo savepointInfo =
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
index 36cd663..43a7445 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
@@ -20,15 +20,21 @@ package org.apache.flink.kubernetes.operator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.util.function.TriFunction;
import javax.annotation.Nonnull;
@@ -40,7 +46,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
/** Testing ClusterClient used implementation. */
-public class TestingClusterClient<T> implements ClusterClient<T> {
+public class TestingClusterClient<T> extends RestClusterClient<T> {
private Function<JobID, CompletableFuture<Acknowledge>> cancelFunction =
ignore -> CompletableFuture.completedFuture(Acknowledge.get());
@@ -48,10 +54,19 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
stopWithSavepointFunction =
(ignore1, ignore2, savepointPath) ->
CompletableFuture.completedFuture(savepointPath);
+ private TriFunction<
+ MessageHeaders<?, ?, ?>,
+ MessageParameters,
+ RequestBody,
+ CompletableFuture<ResponseBody>>
+ triggerSavepointFunction =
+ (ignore1, ignore2, ignore) ->
+ CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
private final T clusterId;
- public TestingClusterClient(T clusterId) {
+ public TestingClusterClient(Configuration configuration, T clusterId) throws Exception {
+ super(configuration, clusterId, (c, e) -> new StandaloneClientHAServices("localhost"));
this.clusterId = clusterId;
}
@@ -65,6 +80,16 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
this.stopWithSavepointFunction = stopWithSavepointFunction;
}
+ public void setTriggerSavepointFunction(
+ TriFunction<
+ MessageHeaders<?, ?, ?>,
+ MessageParameters,
+ RequestBody,
+ CompletableFuture<ResponseBody>>
+ triggerSavepointFunction) {
+ this.triggerSavepointFunction = triggerSavepointFunction;
+ }
+
@Override
public T getClusterId() {
return clusterId;
@@ -146,4 +171,15 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
@Override
public void close() {}
+
+ @Override
+ public <
+ M extends MessageHeaders<R, P, U>,
+ U extends MessageParameters,
+ R extends RequestBody,
+ P extends ResponseBody>
+ CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) {
+ return (CompletableFuture<P>)
+ triggerSavepointFunction.apply(messageHeaders, messageParameters, request);
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
index 7fc837f..6c66304 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -20,18 +20,26 @@ package org.apache.flink.kubernetes.operator.service;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingClusterClient;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -62,7 +70,7 @@ public class FlinkServiceTest {
@Test
public void testCancelJobWithStatelessUpgradeMode() throws Exception {
final TestingClusterClient<String> testingClusterClient =
- new TestingClusterClient<>(CLUSTER_ID);
+ new TestingClusterClient<>(configuration, CLUSTER_ID);
final CompletableFuture<JobID> cancelFuture = new CompletableFuture<>();
testingClusterClient.setCancelFunction(
jobID -> {
@@ -83,10 +91,11 @@ public class FlinkServiceTest {
@Test
public void testCancelJobWithSavepointUpgradeMode() throws Exception {
final TestingClusterClient<String> testingClusterClient =
- new TestingClusterClient<>(CLUSTER_ID);
+ new TestingClusterClient<>(configuration, CLUSTER_ID);
final CompletableFuture<Tuple3<JobID, Boolean, String>> stopWithSavepointFuture =
new CompletableFuture<>();
final String savepointPath = "file:///path/of/svp-1";
+ configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath);
testingClusterClient.setStopWithSavepointFunction(
(jobID, advanceToEndOfEventTime, savepointDir) -> {
stopWithSavepointFuture.complete(
@@ -102,7 +111,7 @@ public class FlinkServiceTest {
assertTrue(stopWithSavepointFuture.isDone());
assertEquals(jobID, stopWithSavepointFuture.get().f0);
assertFalse(stopWithSavepointFuture.get().f1);
- assertNull(stopWithSavepointFuture.get().f2);
+ assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
assertTrue(result.isPresent());
assertEquals(savepointPath, result.get());
}
@@ -112,8 +121,9 @@ public class FlinkServiceTest {
configuration.set(
HighAvailabilityOptions.HA_MODE,
KubernetesHaServicesFactory.class.getCanonicalName());
+ configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, "file:///path/of/ha");
final TestingClusterClient<String> testingClusterClient =
- new TestingClusterClient<>(CLUSTER_ID);
+ new TestingClusterClient<>(configuration, CLUSTER_ID);
final FlinkService flinkService = createFlinkService(testingClusterClient);
client.apps()
@@ -138,8 +148,43 @@ public class FlinkServiceTest {
.get());
}
+ @Test
+ public void testTriggerSavepoint() throws Exception {
+ final TestingClusterClient<String> testingClusterClient =
+ new TestingClusterClient<>(configuration, CLUSTER_ID);
+ final CompletableFuture<Tuple3<JobID, String, Boolean>> triggerSavepointFuture =
+ new CompletableFuture<>();
+ final String savepointPath = "file:///path/of/svp";
+ configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath);
+ testingClusterClient.setTriggerSavepointFunction(
+ (headers, parameters, requestBody) -> {
+ triggerSavepointFuture.complete(
+ new Tuple3<>(
+ ((SavepointTriggerMessageParameters) parameters)
+ .jobID.getValue(),
+ ((SavepointTriggerRequestBody) requestBody)
+ .getTargetDirectory(),
+ ((SavepointTriggerRequestBody) requestBody).isCancelJob()));
+ return CompletableFuture.completedFuture(new TriggerResponse(new TriggerId()));
+ });
+
+ final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+ final JobID jobID = JobID.generate();
+ final FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobId(jobID.toString());
+ flinkDeployment.getStatus().setJobStatus(jobStatus);
+ flinkService.triggerSavepoint(flinkDeployment, configuration);
+ assertTrue(triggerSavepointFuture.isDone());
+ assertEquals(jobID, triggerSavepointFuture.get().f0);
+ assertEquals(savepointPath, triggerSavepointFuture.get().f1);
+ assertFalse(triggerSavepointFuture.get().f2);
+ }
+
private FlinkService createFlinkService(ClusterClient<String> clusterClient) {
- return new FlinkService((NamespacedKubernetesClient) client, null) {
+ return new FlinkService(
+ client, FlinkOperatorConfiguration.fromConfiguration(configuration)) {
@Override
protected ClusterClient<String> getClusterClient(Configuration config) {
return clusterClient;