You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/28 11:32:38 UTC

[flink-kubernetes-operator] branch release-0.1 updated: [FLINK-26881] Pick up the savepoint directory in the flink config when trigger savepoint

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/release-0.1 by this push:
     new cc35836  [FLINK-26881] Pick up the savepoint directory in the flink config when trigger savepoint
cc35836 is described below

commit cc3583632dc31aa48dd8f186c15ea5082e00a173
Author: Yang Wang <da...@alibaba-inc.com>
AuthorDate: Mon Mar 28 19:30:11 2022 +0800

    [FLINK-26881] Pick up the savepoint directory in the flink config when trigger savepoint
---
 .../kubernetes/operator/service/FlinkService.java  | 41 +++++++++++++---
 .../kubernetes/operator/TestingClusterClient.java  | 42 ++++++++++++++--
 .../operator/service/FlinkServiceTest.java         | 57 +++++++++++++++++++---
 3 files changed, 123 insertions(+), 17 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index e17f185..c4aaf02 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -28,6 +28,7 @@ import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -51,6 +52,9 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMess
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import io.fabric8.kubernetes.api.model.PodList;
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -70,6 +74,7 @@ import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Service for submitting and interacting with Flink clusters and jobs. */
 public class FlinkService {
@@ -167,20 +172,37 @@ public class FlinkService {
             @Nullable JobID jobID, UpgradeMode upgradeMode, Configuration conf) throws Exception {
         Optional<String> savepointOpt = Optional.empty();
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+            final String clusterId = clusterClient.getClusterId();
             switch (upgradeMode) {
                 case STATELESS:
                     clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
                     break;
                 case SAVEPOINT:
-                    String savepoint =
-                            clusterClient
-                                    .stopWithSavepoint(jobID, false, null)
-                                    .get(1, TimeUnit.MINUTES);
-                    savepointOpt = Optional.of(savepoint);
+                    final String savepointDirectory =
+                            Preconditions.checkNotNull(
+                                    conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
+                    final long timeout =
+                            conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
+                                    .getSeconds();
+                    try {
+                        String savepoint =
+                                clusterClient
+                                        .stopWithSavepoint(jobID, false, savepointDirectory)
+                                        .get(timeout, TimeUnit.SECONDS);
+                        savepointOpt = Optional.of(savepoint);
+                    } catch (TimeoutException exception) {
+                        throw new FlinkException(
+                                String.format(
+                                        "Timed out stopping the job %s in Flink cluster %s with savepoint, "
+                                                + "please configure a larger timeout via '%s'",
+                                        jobID,
+                                        clusterId,
+                                        ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()),
+                                exception);
+                    }
                     break;
                 case LAST_STATE:
                     final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
-                    final String clusterId = clusterClient.getClusterId();
                     FlinkUtils.deleteCluster(namespace, clusterId, kubernetesClient, false);
                     break;
                 default:
@@ -207,13 +229,16 @@ public class FlinkService {
             savepointTriggerMessageParameters.jobID.resolve(
                     JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()));
 
+            final String savepointDirectory =
+                    Preconditions.checkNotNull(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
+            final long timeout = operatorConfiguration.getFlinkClientTimeout().getSeconds();
             TriggerResponse response =
                     clusterClient
                             .sendRequest(
                                     savepointTriggerHeaders,
                                     savepointTriggerMessageParameters,
-                                    new SavepointTriggerRequestBody(null, false))
-                            .get();
+                                    new SavepointTriggerRequestBody(savepointDirectory, false))
+                            .get(timeout, TimeUnit.SECONDS);
             LOG.info("Savepoint successfully triggered: " + response.getTriggerId().toHexString());
 
             org.apache.flink.kubernetes.operator.crd.status.SavepointInfo savepointInfo =
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
index 36cd663..43a7445 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
@@ -20,15 +20,21 @@ package org.apache.flink.kubernetes.operator;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.util.function.TriFunction;
 
 import javax.annotation.Nonnull;
@@ -40,7 +46,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
 /** Testing ClusterClient used implementation. */
-public class TestingClusterClient<T> implements ClusterClient<T> {
+public class TestingClusterClient<T> extends RestClusterClient<T> {
 
     private Function<JobID, CompletableFuture<Acknowledge>> cancelFunction =
             ignore -> CompletableFuture.completedFuture(Acknowledge.get());
@@ -48,10 +54,19 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
             stopWithSavepointFunction =
                     (ignore1, ignore2, savepointPath) ->
                             CompletableFuture.completedFuture(savepointPath);
+    private TriFunction<
+                    MessageHeaders<?, ?, ?>,
+                    MessageParameters,
+                    RequestBody,
+                    CompletableFuture<ResponseBody>>
+            triggerSavepointFunction =
+                    (ignore1, ignore2, ignore) ->
+                            CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
 
     private final T clusterId;
 
-    public TestingClusterClient(T clusterId) {
+    public TestingClusterClient(Configuration configuration, T clusterId) throws Exception {
+        super(configuration, clusterId, (c, e) -> new StandaloneClientHAServices("localhost"));
         this.clusterId = clusterId;
     }
 
@@ -65,6 +80,16 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
         this.stopWithSavepointFunction = stopWithSavepointFunction;
     }
 
+    public void setTriggerSavepointFunction(
+            TriFunction<
+                            MessageHeaders<?, ?, ?>,
+                            MessageParameters,
+                            RequestBody,
+                            CompletableFuture<ResponseBody>>
+                    triggerSavepointFunction) {
+        this.triggerSavepointFunction = triggerSavepointFunction;
+    }
+
     @Override
     public T getClusterId() {
         return clusterId;
@@ -146,4 +171,15 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
 
     @Override
     public void close() {}
+
+    @Override
+    public <
+                    M extends MessageHeaders<R, P, U>,
+                    U extends MessageParameters,
+                    R extends RequestBody,
+                    P extends ResponseBody>
+            CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) {
+        return (CompletableFuture<P>)
+                triggerSavepointFunction.apply(messageHeaders, messageParameters, request);
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
index 7fc837f..6c66304 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -20,18 +20,26 @@ package org.apache.flink.kubernetes.operator.service;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingClusterClient;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
 
 import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -62,7 +70,7 @@ public class FlinkServiceTest {
     @Test
     public void testCancelJobWithStatelessUpgradeMode() throws Exception {
         final TestingClusterClient<String> testingClusterClient =
-                new TestingClusterClient<>(CLUSTER_ID);
+                new TestingClusterClient<>(configuration, CLUSTER_ID);
         final CompletableFuture<JobID> cancelFuture = new CompletableFuture<>();
         testingClusterClient.setCancelFunction(
                 jobID -> {
@@ -83,10 +91,11 @@ public class FlinkServiceTest {
     @Test
     public void testCancelJobWithSavepointUpgradeMode() throws Exception {
         final TestingClusterClient<String> testingClusterClient =
-                new TestingClusterClient<>(CLUSTER_ID);
+                new TestingClusterClient<>(configuration, CLUSTER_ID);
         final CompletableFuture<Tuple3<JobID, Boolean, String>> stopWithSavepointFuture =
                 new CompletableFuture<>();
         final String savepointPath = "file:///path/of/svp-1";
+        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath);
         testingClusterClient.setStopWithSavepointFunction(
                 (jobID, advanceToEndOfEventTime, savepointDir) -> {
                     stopWithSavepointFuture.complete(
@@ -102,7 +111,7 @@ public class FlinkServiceTest {
         assertTrue(stopWithSavepointFuture.isDone());
         assertEquals(jobID, stopWithSavepointFuture.get().f0);
         assertFalse(stopWithSavepointFuture.get().f1);
-        assertNull(stopWithSavepointFuture.get().f2);
+        assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
         assertTrue(result.isPresent());
         assertEquals(savepointPath, result.get());
     }
@@ -112,8 +121,9 @@ public class FlinkServiceTest {
         configuration.set(
                 HighAvailabilityOptions.HA_MODE,
                 KubernetesHaServicesFactory.class.getCanonicalName());
+        configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, "file:///path/of/ha");
         final TestingClusterClient<String> testingClusterClient =
-                new TestingClusterClient<>(CLUSTER_ID);
+                new TestingClusterClient<>(configuration, CLUSTER_ID);
         final FlinkService flinkService = createFlinkService(testingClusterClient);
 
         client.apps()
@@ -138,8 +148,43 @@ public class FlinkServiceTest {
                         .get());
     }
 
+    @Test
+    public void testTriggerSavepoint() throws Exception {
+        final TestingClusterClient<String> testingClusterClient =
+                new TestingClusterClient<>(configuration, CLUSTER_ID);
+        final CompletableFuture<Tuple3<JobID, String, Boolean>> triggerSavepointFuture =
+                new CompletableFuture<>();
+        final String savepointPath = "file:///path/of/svp";
+        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath);
+        testingClusterClient.setTriggerSavepointFunction(
+                (headers, parameters, requestBody) -> {
+                    triggerSavepointFuture.complete(
+                            new Tuple3<>(
+                                    ((SavepointTriggerMessageParameters) parameters)
+                                            .jobID.getValue(),
+                                    ((SavepointTriggerRequestBody) requestBody)
+                                            .getTargetDirectory(),
+                                    ((SavepointTriggerRequestBody) requestBody).isCancelJob()));
+                    return CompletableFuture.completedFuture(new TriggerResponse(new TriggerId()));
+                });
+
+        final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+        final JobID jobID = JobID.generate();
+        final FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
+        JobStatus jobStatus = new JobStatus();
+        jobStatus.setJobId(jobID.toString());
+        flinkDeployment.getStatus().setJobStatus(jobStatus);
+        flinkService.triggerSavepoint(flinkDeployment, configuration);
+        assertTrue(triggerSavepointFuture.isDone());
+        assertEquals(jobID, triggerSavepointFuture.get().f0);
+        assertEquals(savepointPath, triggerSavepointFuture.get().f1);
+        assertFalse(triggerSavepointFuture.get().f2);
+    }
+
     private FlinkService createFlinkService(ClusterClient<String> clusterClient) {
-        return new FlinkService((NamespacedKubernetesClient) client, null) {
+        return new FlinkService(
+                client, FlinkOperatorConfiguration.fromConfiguration(configuration)) {
             @Override
             protected ClusterClient<String> getClusterClient(Configuration config) {
                 return clusterClient;