You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/02/25 08:01:09 UTC

[flink-kubernetes-operator] 01/02: [FLINK-26141] Support last-state upgrade mode

This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 2e6e978abd499d1b534e069b1d296f33d4300be9
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Thu Feb 24 15:47:41 2022 +0800

    [FLINK-26141] Support last-state upgrade mode
---
 .../operator/reconciler/JobReconciler.java         |  57 +++----
 .../kubernetes/operator/service/FlinkService.java  |  15 +-
 .../kubernetes/operator/utils/FlinkUtils.java      |  12 +-
 .../kubernetes/operator/TestingClusterClient.java  | 149 ++++++++++++++++++
 .../operator/reconciler/JobReconcilerTest.java     |  80 ++++++++--
 .../operator/service/FlinkServiceTest.java         | 175 +++++++++++++++++++++
 6 files changed, 444 insertions(+), 44 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index dee22fb..fb599e8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -25,7 +25,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -80,23 +79,21 @@ public class JobReconciler {
                     upgradeFlinkJob(flinkApp, effectiveConfig);
                 }
                 if (desiredJobState.equals(JobState.SUSPENDED)) {
-                    if (upgradeMode == UpgradeMode.STATELESS) {
-                        cancelJob(flinkApp, effectiveConfig);
-                    } else {
-                        suspendJob(flinkApp, effectiveConfig);
-                    }
+                    printCancelLogs(upgradeMode, flinkApp.getMetadata().getName());
+                    cancelJob(flinkApp, upgradeMode, effectiveConfig);
                 }
             }
-            if (currentJobState == JobState.SUSPENDED) {
-                if (desiredJobState == JobState.RUNNING) {
-                    if (upgradeMode == UpgradeMode.STATELESS) {
-                        deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
-                    } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
-                        restoreFromLastSavepoint(flinkApp, effectiveConfig);
-                    } else {
-                        throw new InvalidDeploymentException(
-                                "Only savepoint and stateless strategies are supported at the moment.");
-                    }
+            if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
+                if (upgradeMode == UpgradeMode.STATELESS) {
+                    deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+                } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
+                    restoreFromLastSavepoint(flinkApp, effectiveConfig);
+                } else if (upgradeMode == UpgradeMode.LAST_STATE) {
+                    final String savepointLocation =
+                            flinkApp.getStatus().getJobStatus().getSavepointLocation();
+                    // Upgrade mode changes from savepoint -> last-state
+                    deployFlinkJob(
+                            flinkApp, effectiveConfig, Optional.ofNullable(savepointLocation));
                 }
             }
         }
@@ -116,7 +113,8 @@ public class JobReconciler {
     private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
             throws Exception {
         LOG.info("Upgrading running job");
-        Optional<String> savepoint = cancelJob(flinkApp, effectiveConfig);
+        final Optional<String> savepoint =
+                cancelJob(flinkApp, flinkApp.getSpec().getJob().getUpgradeMode(), effectiveConfig);
         deployFlinkJob(flinkApp, effectiveConfig, savepoint);
     }
 
@@ -126,17 +124,20 @@ public class JobReconciler {
         deployFlinkJob(flinkApp, effectiveConfig, Optional.of(jobStatus.getSavepointLocation()));
     }
 
-    private Optional<String> suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
-        LOG.info("Suspending {}", flinkApp.getMetadata().getName());
-        return cancelJob(flinkApp, UpgradeMode.SAVEPOINT, effectiveConfig);
-    }
-
-    private Optional<String> cancelJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
-        LOG.info("Cancelling {}", flinkApp.getMetadata().getName());
-        UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
-        return cancelJob(flinkApp, upgradeMode, effectiveConfig);
+    private void printCancelLogs(UpgradeMode upgradeMode, String name) {
+        switch (upgradeMode) {
+            case STATELESS:
+                LOG.info("Cancelling {}", name);
+                break;
+            case SAVEPOINT:
+                LOG.info("Suspending {}", name);
+                break;
+            case LAST_STATE:
+                LOG.info("Cancelling {} with last state retained", name);
+                break;
+            default:
+                throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
+        }
     }
 
     private Optional<String> cancelJob(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index c8edf01..8b9af3b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.service;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.cli.ApplicationDeployer;
 import org.apache.flink.client.deployment.ClusterClientFactory;
@@ -34,9 +35,11 @@ import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import org.slf4j.Logger;
@@ -97,7 +100,8 @@ public class FlinkService {
         }
     }
 
-    private ClusterClient<String> getClusterClient(Configuration config) throws Exception {
+    @VisibleForTesting
+    protected ClusterClient<String> getClusterClient(Configuration config) throws Exception {
         final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
         final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
         final int port = config.getInteger(RestOptions.PORT);
@@ -125,6 +129,15 @@ public class FlinkService {
                                     .get(1, TimeUnit.MINUTES);
                     savepointOpt = Optional.of(savepoint);
                     break;
+                case LAST_STATE:
+                    if (!HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
+                        throw new InvalidDeploymentException(
+                                "Job could not be upgraded with last-state while HA disabled");
+                    }
+                    final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
+                    final String clusterId = clusterClient.getClusterId();
+                    FlinkUtils.deleteCluster(namespace, clusterId, kubernetesClient);
+                    break;
                 default:
                     throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
             }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index fd8061a..8a0ef5c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -109,11 +109,19 @@ public class FlinkUtils {
     }
 
     public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+        deleteCluster(
+                flinkApp.getMetadata().getNamespace(),
+                flinkApp.getMetadata().getName(),
+                kubernetesClient);
+    }
+
+    public static void deleteCluster(
+            String namespace, String clusterId, KubernetesClient kubernetesClient) {
         kubernetesClient
                 .apps()
                 .deployments()
-                .inNamespace(flinkApp.getMetadata().getNamespace())
-                .withName(flinkApp.getMetadata().getName())
+                .inNamespace(namespace)
+                .withName(clusterId)
                 .cascading(true)
                 .delete();
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
new file mode 100644
index 0000000..36cd663
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.util.function.TriFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/** Testing ClusterClient used implementation. */
+public class TestingClusterClient<T> implements ClusterClient<T> {
+
+    private Function<JobID, CompletableFuture<Acknowledge>> cancelFunction =
+            ignore -> CompletableFuture.completedFuture(Acknowledge.get());
+    private TriFunction<JobID, Boolean, String, CompletableFuture<String>>
+            stopWithSavepointFunction =
+                    (ignore1, ignore2, savepointPath) ->
+                            CompletableFuture.completedFuture(savepointPath);
+
+    private final T clusterId;
+
+    public TestingClusterClient(T clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public void setCancelFunction(Function<JobID, CompletableFuture<Acknowledge>> cancelFunction) {
+        this.cancelFunction = cancelFunction;
+    }
+
+    public void setStopWithSavepointFunction(
+            TriFunction<JobID, Boolean, String, CompletableFuture<String>>
+                    stopWithSavepointFunction) {
+        this.stopWithSavepointFunction = stopWithSavepointFunction;
+    }
+
+    @Override
+    public T getClusterId() {
+        return clusterId;
+    }
+
+    @Override
+    public Configuration getFlinkConfiguration() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void shutDownCluster() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getWebInterfaceURL() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> cancel(JobID jobId) {
+        return cancelFunction.apply(jobId);
+    }
+
+    @Override
+    public CompletableFuture<String> cancelWithSavepoint(
+            JobID jobId, @Nullable String savepointDirectory) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            JobID jobId, boolean advanceToEndOfEventTime, @Nullable String savepointDirectory) {
+        return stopWithSavepointFunction.apply(jobId, advanceToEndOfEventTime, savepointDirectory);
+    }
+
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            JobID jobId, @Nullable String savepointDirectory) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+            JobID jobId, OperatorID operatorId, CoordinationRequest request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() {}
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index a7f7f36..8f99806 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -22,10 +22,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import org.junit.jupiter.api.Test;
 
@@ -33,6 +35,7 @@ import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** @link JobStatusObserver unit tests */
 public class JobReconcilerTest {
@@ -47,19 +50,7 @@ public class JobReconcilerTest {
 
         reconciler.reconcile("test", deployment, config);
         List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
-        assertEquals(1, runningJobs.size());
-        assertNull(runningJobs.get(0).f0);
-        deployment
-                .getStatus()
-                .getReconciliationStatus()
-                .setLastReconciledSpec(deployment.getSpec());
-
-        JobStatus jobStatus = new JobStatus();
-        jobStatus.setJobName(runningJobs.get(0).f1.getJobName());
-        jobStatus.setJobId(runningJobs.get(0).f1.getJobId().toHexString());
-        jobStatus.setState("RUNNING");
-
-        deployment.getStatus().setJobStatus(jobStatus);
+        verifyAndSetRunningJobsToStatus(deployment, runningJobs);
 
         // Test stateless upgrade
         FlinkDeployment statelessUpgrade = TestUtils.clone(deployment);
@@ -87,4 +78,67 @@ public class JobReconcilerTest {
         assertEquals(1, runningJobs.size());
         assertEquals("savepoint_0", runningJobs.get(0).f0);
     }
+
+    @Test
+    public void testUpgradeModeChangeFromSavepointToLastState() throws Exception {
+        final String expectedSavepointPath = "savepoint_0";
+        final TestingFlinkService flinkService = new TestingFlinkService();
+
+        final JobReconciler reconciler = new JobReconciler(null, flinkService);
+        final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        final Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
+
+        reconciler.reconcile("test", deployment, config);
+        List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
+        verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+
+        // Suspend FlinkDeployment with savepoint upgrade mode
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+        deployment.getSpec().getJob().setState(JobState.SUSPENDED);
+        deployment.getSpec().setImage("new-image-1");
+
+        reconciler.reconcile("test", deployment, config);
+        assertEquals(0, flinkService.listJobs().size());
+        assertTrue(
+                JobState.SUSPENDED
+                        .name()
+                        .equalsIgnoreCase(deployment.getStatus().getJobStatus().getState()));
+        assertEquals(
+                expectedSavepointPath,
+                deployment.getStatus().getJobStatus().getSavepointLocation());
+
+        // Resume FlinkDeployment with last-state upgrade mode
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .getLastReconciledSpec()
+                .getJob()
+                .setState(JobState.SUSPENDED);
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        deployment.getSpec().getJob().setState(JobState.RUNNING);
+        deployment.getSpec().setImage("new-image-2");
+
+        reconciler.reconcile("test", deployment, config);
+        runningJobs = flinkService.listJobs();
+        assertEquals(expectedSavepointPath, config.get(SavepointConfigOptions.SAVEPOINT_PATH));
+        assertEquals(1, runningJobs.size());
+        assertEquals(expectedSavepointPath, runningJobs.get(0).f0);
+    }
+
+    private void verifyAndSetRunningJobsToStatus(
+            FlinkDeployment deployment, List<Tuple2<String, JobStatusMessage>> runningJobs) {
+        assertEquals(1, runningJobs.size());
+        assertNull(runningJobs.get(0).f0);
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .setLastReconciledSpec(TestUtils.clone(deployment.getSpec()));
+
+        JobStatus jobStatus = new JobStatus();
+        jobStatus.setJobName(runningJobs.get(0).f1.getJobName());
+        jobStatus.setJobId(runningJobs.get(0).f1.getJobId().toHexString());
+        jobStatus.setState("RUNNING");
+
+        deployment.getStatus().setJobStatus(jobStatus);
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
new file mode 100644
index 0000000..594aeea
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import org.apache.flink.kubernetes.operator.TestingClusterClient;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link FlinkService unit tests */
+@EnableKubernetesMockClient(crud = true)
+public class FlinkServiceTest {
+    KubernetesClient client;
+    private final Configuration configuration = new Configuration();
+    private static final String CLUSTER_ID = "testing-flink-cluster";
+    private static final String TESTING_NAMESPACE = "test";
+
+    @BeforeEach
+    public void setup() {
+        configuration.set(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
+        configuration.set(KubernetesConfigOptions.NAMESPACE, TESTING_NAMESPACE);
+    }
+
+    @Test
+    public void testCancelJobWithStatelessUpgradeMode() throws Exception {
+        final TestingClusterClient<String> testingClusterClient =
+                new TestingClusterClient<>(CLUSTER_ID);
+        final CompletableFuture<JobID> cancelFuture = new CompletableFuture<>();
+        testingClusterClient.setCancelFunction(
+                jobID -> {
+                    cancelFuture.complete(jobID);
+                    return CompletableFuture.completedFuture(Acknowledge.get());
+                });
+
+        final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+        final JobID jobID = JobID.generate();
+        Optional<String> result =
+                flinkService.cancelJob(jobID, UpgradeMode.STATELESS, configuration);
+        assertTrue(cancelFuture.isDone());
+        assertEquals(jobID, cancelFuture.get());
+        assertFalse(result.isPresent());
+    }
+
+    @Test
+    public void testCancelJobWithSavepointUpgradeMode() throws Exception {
+        final TestingClusterClient<String> testingClusterClient =
+                new TestingClusterClient<>(CLUSTER_ID);
+        final CompletableFuture<Tuple3<JobID, Boolean, String>> stopWithSavepointFuture =
+                new CompletableFuture<>();
+        final String savepointPath = "file:///path/of/svp-1";
+        testingClusterClient.setStopWithSavepointFunction(
+                (jobID, advanceToEndOfEventTime, savepointDir) -> {
+                    stopWithSavepointFuture.complete(
+                            new Tuple3<>(jobID, advanceToEndOfEventTime, savepointDir));
+                    return CompletableFuture.completedFuture(savepointPath);
+                });
+
+        final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+        final JobID jobID = JobID.generate();
+        Optional<String> result =
+                flinkService.cancelJob(jobID, UpgradeMode.SAVEPOINT, configuration);
+        assertTrue(stopWithSavepointFuture.isDone());
+        assertEquals(jobID, stopWithSavepointFuture.get().f0);
+        assertFalse(stopWithSavepointFuture.get().f1);
+        assertNull(stopWithSavepointFuture.get().f2);
+        assertTrue(result.isPresent());
+        assertEquals(savepointPath, result.get());
+    }
+
+    @Test
+    public void testCancelJobWithLastStateUpgradeMode() throws Exception {
+        configuration.set(
+                HighAvailabilityOptions.HA_MODE,
+                KubernetesHaServicesFactory.class.getCanonicalName());
+        final TestingClusterClient<String> testingClusterClient =
+                new TestingClusterClient<>(CLUSTER_ID);
+        final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+        client.apps()
+                .deployments()
+                .inNamespace(TESTING_NAMESPACE)
+                .create(createTestingDeployment());
+        assertNotNull(
+                client.apps()
+                        .deployments()
+                        .inNamespace(TESTING_NAMESPACE)
+                        .withName(CLUSTER_ID)
+                        .get());
+        final JobID jobID = JobID.generate();
+        Optional<String> result =
+                flinkService.cancelJob(jobID, UpgradeMode.LAST_STATE, configuration);
+        assertFalse(result.isPresent());
+        assertNull(
+                client.apps()
+                        .deployments()
+                        .inNamespace(TESTING_NAMESPACE)
+                        .withName(CLUSTER_ID)
+                        .get());
+    }
+
+    @Test
+    public void testCancelJobWithLastStateUpgradeModeWhenHADisabled() {
+        configuration.set(HighAvailabilityOptions.HA_MODE, "None");
+        final TestingClusterClient<String> testingClusterClient =
+                new TestingClusterClient<>(CLUSTER_ID);
+        final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+        final JobID jobID = JobID.generate();
+        assertThrows(
+                InvalidDeploymentException.class,
+                () -> flinkService.cancelJob(jobID, UpgradeMode.LAST_STATE, configuration));
+    }
+
+    private FlinkService createFlinkService(ClusterClient<String> clusterClient) {
+        return new FlinkService((NamespacedKubernetesClient) client) {
+            @Override
+            protected ClusterClient<String> getClusterClient(Configuration config) {
+                return clusterClient;
+            }
+        };
+    }
+
+    private Deployment createTestingDeployment() {
+        return new DeploymentBuilder()
+                .withNewMetadata()
+                .withName(CLUSTER_ID)
+                .withNamespace(TESTING_NAMESPACE)
+                .endMetadata()
+                .withNewSpec()
+                .endSpec()
+                .build();
+    }
+}