You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/02/25 08:01:09 UTC
[flink-kubernetes-operator] 01/02: [FLINK-26141] Support last-state upgrade mode
This is an automated email from the ASF dual-hosted git repository.
wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 2e6e978abd499d1b534e069b1d296f33d4300be9
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Thu Feb 24 15:47:41 2022 +0800
[FLINK-26141] Support last-state upgrade mode
---
.../operator/reconciler/JobReconciler.java | 57 +++----
.../kubernetes/operator/service/FlinkService.java | 15 +-
.../kubernetes/operator/utils/FlinkUtils.java | 12 +-
.../kubernetes/operator/TestingClusterClient.java | 149 ++++++++++++++++++
.../operator/reconciler/JobReconcilerTest.java | 80 ++++++++--
.../operator/service/FlinkServiceTest.java | 175 +++++++++++++++++++++
6 files changed, 444 insertions(+), 44 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index dee22fb..fb599e8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -25,7 +25,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -80,23 +79,21 @@ public class JobReconciler {
upgradeFlinkJob(flinkApp, effectiveConfig);
}
if (desiredJobState.equals(JobState.SUSPENDED)) {
- if (upgradeMode == UpgradeMode.STATELESS) {
- cancelJob(flinkApp, effectiveConfig);
- } else {
- suspendJob(flinkApp, effectiveConfig);
- }
+ printCancelLogs(upgradeMode, flinkApp.getMetadata().getName());
+ cancelJob(flinkApp, upgradeMode, effectiveConfig);
}
}
- if (currentJobState == JobState.SUSPENDED) {
- if (desiredJobState == JobState.RUNNING) {
- if (upgradeMode == UpgradeMode.STATELESS) {
- deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
- } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
- restoreFromLastSavepoint(flinkApp, effectiveConfig);
- } else {
- throw new InvalidDeploymentException(
- "Only savepoint and stateless strategies are supported at the moment.");
- }
+ if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
+ if (upgradeMode == UpgradeMode.STATELESS) {
+ deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+ } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
+ restoreFromLastSavepoint(flinkApp, effectiveConfig);
+ } else if (upgradeMode == UpgradeMode.LAST_STATE) {
+ final String savepointLocation =
+ flinkApp.getStatus().getJobStatus().getSavepointLocation();
+ // Upgrade mode changes from savepoint -> last-state
+ deployFlinkJob(
+ flinkApp, effectiveConfig, Optional.ofNullable(savepointLocation));
}
}
}
@@ -116,7 +113,8 @@ public class JobReconciler {
private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
throws Exception {
LOG.info("Upgrading running job");
- Optional<String> savepoint = cancelJob(flinkApp, effectiveConfig);
+ final Optional<String> savepoint =
+ cancelJob(flinkApp, flinkApp.getSpec().getJob().getUpgradeMode(), effectiveConfig);
deployFlinkJob(flinkApp, effectiveConfig, savepoint);
}
@@ -126,17 +124,20 @@ public class JobReconciler {
deployFlinkJob(flinkApp, effectiveConfig, Optional.of(jobStatus.getSavepointLocation()));
}
- private Optional<String> suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
- throws Exception {
- LOG.info("Suspending {}", flinkApp.getMetadata().getName());
- return cancelJob(flinkApp, UpgradeMode.SAVEPOINT, effectiveConfig);
- }
-
- private Optional<String> cancelJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
- throws Exception {
- LOG.info("Cancelling {}", flinkApp.getMetadata().getName());
- UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
- return cancelJob(flinkApp, upgradeMode, effectiveConfig);
+ private void printCancelLogs(UpgradeMode upgradeMode, String name) {
+ switch (upgradeMode) {
+ case STATELESS:
+ LOG.info("Cancelling {}", name);
+ break;
+ case SAVEPOINT:
+ LOG.info("Suspending {}", name);
+ break;
+ case LAST_STATE:
+ LOG.info("Cancelling {} with last state retained", name);
+ break;
+ default:
+ throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
+ }
}
private Optional<String> cancelJob(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index c8edf01..8b9af3b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.service;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.ApplicationDeployer;
import org.apache.flink.client.deployment.ClusterClientFactory;
@@ -34,9 +35,11 @@ import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import org.slf4j.Logger;
@@ -97,7 +100,8 @@ public class FlinkService {
}
}
- private ClusterClient<String> getClusterClient(Configuration config) throws Exception {
+ @VisibleForTesting
+ protected ClusterClient<String> getClusterClient(Configuration config) throws Exception {
final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
final int port = config.getInteger(RestOptions.PORT);
@@ -125,6 +129,15 @@ public class FlinkService {
.get(1, TimeUnit.MINUTES);
savepointOpt = Optional.of(savepoint);
break;
+ case LAST_STATE:
+ if (!HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
+ throw new InvalidDeploymentException(
+ "Job could not be upgraded with last-state while HA disabled");
+ }
+ final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
+ final String clusterId = clusterClient.getClusterId();
+ FlinkUtils.deleteCluster(namespace, clusterId, kubernetesClient);
+ break;
default:
throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index fd8061a..8a0ef5c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -109,11 +109,19 @@ public class FlinkUtils {
}
public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+ deleteCluster(
+ flinkApp.getMetadata().getNamespace(),
+ flinkApp.getMetadata().getName(),
+ kubernetesClient);
+ }
+
+ public static void deleteCluster(
+ String namespace, String clusterId, KubernetesClient kubernetesClient) {
kubernetesClient
.apps()
.deployments()
- .inNamespace(flinkApp.getMetadata().getNamespace())
- .withName(flinkApp.getMetadata().getName())
+ .inNamespace(namespace)
+ .withName(clusterId)
.cascading(true)
.delete();
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
new file mode 100644
index 0000000..36cd663
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.util.function.TriFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/** Testing ClusterClient used implementation. */
+public class TestingClusterClient<T> implements ClusterClient<T> {
+
+ private Function<JobID, CompletableFuture<Acknowledge>> cancelFunction =
+ ignore -> CompletableFuture.completedFuture(Acknowledge.get());
+ private TriFunction<JobID, Boolean, String, CompletableFuture<String>>
+ stopWithSavepointFunction =
+ (ignore1, ignore2, savepointPath) ->
+ CompletableFuture.completedFuture(savepointPath);
+
+ private final T clusterId;
+
+ public TestingClusterClient(T clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public void setCancelFunction(Function<JobID, CompletableFuture<Acknowledge>> cancelFunction) {
+ this.cancelFunction = cancelFunction;
+ }
+
+ public void setStopWithSavepointFunction(
+ TriFunction<JobID, Boolean, String, CompletableFuture<String>>
+ stopWithSavepointFunction) {
+ this.stopWithSavepointFunction = stopWithSavepointFunction;
+ }
+
+ @Override
+ public T getClusterId() {
+ return clusterId;
+ }
+
+ @Override
+ public Configuration getFlinkConfiguration() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void shutDownCluster() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getWebInterfaceURL() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> cancel(JobID jobId) {
+ return cancelFunction.apply(jobId);
+ }
+
+ @Override
+ public CompletableFuture<String> cancelWithSavepoint(
+ JobID jobId, @Nullable String savepointDirectory) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(
+ JobID jobId, boolean advanceToEndOfEventTime, @Nullable String savepointDirectory) {
+ return stopWithSavepointFunction.apply(jobId, advanceToEndOfEventTime, savepointDirectory);
+ }
+
+ @Override
+ public CompletableFuture<String> triggerSavepoint(
+ JobID jobId, @Nullable String savepointDirectory) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+ JobID jobId, OperatorID operatorId, CoordinationRequest request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index a7f7f36..8f99806 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -22,10 +22,12 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.junit.jupiter.api.Test;
@@ -33,6 +35,7 @@ import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** @link JobStatusObserver unit tests */
public class JobReconcilerTest {
@@ -47,19 +50,7 @@ public class JobReconcilerTest {
reconciler.reconcile("test", deployment, config);
List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
- assertEquals(1, runningJobs.size());
- assertNull(runningJobs.get(0).f0);
- deployment
- .getStatus()
- .getReconciliationStatus()
- .setLastReconciledSpec(deployment.getSpec());
-
- JobStatus jobStatus = new JobStatus();
- jobStatus.setJobName(runningJobs.get(0).f1.getJobName());
- jobStatus.setJobId(runningJobs.get(0).f1.getJobId().toHexString());
- jobStatus.setState("RUNNING");
-
- deployment.getStatus().setJobStatus(jobStatus);
+ verifyAndSetRunningJobsToStatus(deployment, runningJobs);
// Test stateless upgrade
FlinkDeployment statelessUpgrade = TestUtils.clone(deployment);
@@ -87,4 +78,67 @@ public class JobReconcilerTest {
assertEquals(1, runningJobs.size());
assertEquals("savepoint_0", runningJobs.get(0).f0);
}
+
+ @Test
+ public void testUpgradeModeChangeFromSavepointToLastState() throws Exception {
+ final String expectedSavepointPath = "savepoint_0";
+ final TestingFlinkService flinkService = new TestingFlinkService();
+
+ final JobReconciler reconciler = new JobReconciler(null, flinkService);
+ final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ final Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
+
+ reconciler.reconcile("test", deployment, config);
+ List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
+ verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+
+ // Suspend FlinkDeployment with savepoint upgrade mode
+ deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+ deployment.getSpec().getJob().setState(JobState.SUSPENDED);
+ deployment.getSpec().setImage("new-image-1");
+
+ reconciler.reconcile("test", deployment, config);
+ assertEquals(0, flinkService.listJobs().size());
+ assertTrue(
+ JobState.SUSPENDED
+ .name()
+ .equalsIgnoreCase(deployment.getStatus().getJobStatus().getState()));
+ assertEquals(
+ expectedSavepointPath,
+ deployment.getStatus().getJobStatus().getSavepointLocation());
+
+ // Resume FlinkDeployment with last-state upgrade mode
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getJob()
+ .setState(JobState.SUSPENDED);
+ deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ deployment.getSpec().getJob().setState(JobState.RUNNING);
+ deployment.getSpec().setImage("new-image-2");
+
+ reconciler.reconcile("test", deployment, config);
+ runningJobs = flinkService.listJobs();
+ assertEquals(expectedSavepointPath, config.get(SavepointConfigOptions.SAVEPOINT_PATH));
+ assertEquals(1, runningJobs.size());
+ assertEquals(expectedSavepointPath, runningJobs.get(0).f0);
+ }
+
+ private void verifyAndSetRunningJobsToStatus(
+ FlinkDeployment deployment, List<Tuple2<String, JobStatusMessage>> runningJobs) {
+ assertEquals(1, runningJobs.size());
+ assertNull(runningJobs.get(0).f0);
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .setLastReconciledSpec(TestUtils.clone(deployment.getSpec()));
+
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setJobName(runningJobs.get(0).f1.getJobName());
+ jobStatus.setJobId(runningJobs.get(0).f1.getJobId().toHexString());
+ jobStatus.setState("RUNNING");
+
+ deployment.getStatus().setJobStatus(jobStatus);
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
new file mode 100644
index 0000000..594aeea
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import org.apache.flink.kubernetes.operator.TestingClusterClient;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link FlinkService unit tests */
+@EnableKubernetesMockClient(crud = true)
+public class FlinkServiceTest {
+ KubernetesClient client;
+ private final Configuration configuration = new Configuration();
+ private static final String CLUSTER_ID = "testing-flink-cluster";
+ private static final String TESTING_NAMESPACE = "test";
+
+ @BeforeEach
+ public void setup() {
+ configuration.set(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
+ configuration.set(KubernetesConfigOptions.NAMESPACE, TESTING_NAMESPACE);
+ }
+
+ @Test
+ public void testCancelJobWithStatelessUpgradeMode() throws Exception {
+ final TestingClusterClient<String> testingClusterClient =
+ new TestingClusterClient<>(CLUSTER_ID);
+ final CompletableFuture<JobID> cancelFuture = new CompletableFuture<>();
+ testingClusterClient.setCancelFunction(
+ jobID -> {
+ cancelFuture.complete(jobID);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ });
+
+ final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+ final JobID jobID = JobID.generate();
+ Optional<String> result =
+ flinkService.cancelJob(jobID, UpgradeMode.STATELESS, configuration);
+ assertTrue(cancelFuture.isDone());
+ assertEquals(jobID, cancelFuture.get());
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void testCancelJobWithSavepointUpgradeMode() throws Exception {
+ final TestingClusterClient<String> testingClusterClient =
+ new TestingClusterClient<>(CLUSTER_ID);
+ final CompletableFuture<Tuple3<JobID, Boolean, String>> stopWithSavepointFuture =
+ new CompletableFuture<>();
+ final String savepointPath = "file:///path/of/svp-1";
+ testingClusterClient.setStopWithSavepointFunction(
+ (jobID, advanceToEndOfEventTime, savepointDir) -> {
+ stopWithSavepointFuture.complete(
+ new Tuple3<>(jobID, advanceToEndOfEventTime, savepointDir));
+ return CompletableFuture.completedFuture(savepointPath);
+ });
+
+ final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+ final JobID jobID = JobID.generate();
+ Optional<String> result =
+ flinkService.cancelJob(jobID, UpgradeMode.SAVEPOINT, configuration);
+ assertTrue(stopWithSavepointFuture.isDone());
+ assertEquals(jobID, stopWithSavepointFuture.get().f0);
+ assertFalse(stopWithSavepointFuture.get().f1);
+ assertNull(stopWithSavepointFuture.get().f2);
+ assertTrue(result.isPresent());
+ assertEquals(savepointPath, result.get());
+ }
+
+ @Test
+ public void testCancelJobWithLastStateUpgradeMode() throws Exception {
+ configuration.set(
+ HighAvailabilityOptions.HA_MODE,
+ KubernetesHaServicesFactory.class.getCanonicalName());
+ final TestingClusterClient<String> testingClusterClient =
+ new TestingClusterClient<>(CLUSTER_ID);
+ final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+ client.apps()
+ .deployments()
+ .inNamespace(TESTING_NAMESPACE)
+ .create(createTestingDeployment());
+ assertNotNull(
+ client.apps()
+ .deployments()
+ .inNamespace(TESTING_NAMESPACE)
+ .withName(CLUSTER_ID)
+ .get());
+ final JobID jobID = JobID.generate();
+ Optional<String> result =
+ flinkService.cancelJob(jobID, UpgradeMode.LAST_STATE, configuration);
+ assertFalse(result.isPresent());
+ assertNull(
+ client.apps()
+ .deployments()
+ .inNamespace(TESTING_NAMESPACE)
+ .withName(CLUSTER_ID)
+ .get());
+ }
+
+ @Test
+ public void testCancelJobWithLastStateUpgradeModeWhenHADisabled() {
+ configuration.set(HighAvailabilityOptions.HA_MODE, "None");
+ final TestingClusterClient<String> testingClusterClient =
+ new TestingClusterClient<>(CLUSTER_ID);
+ final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+ final JobID jobID = JobID.generate();
+ assertThrows(
+ InvalidDeploymentException.class,
+ () -> flinkService.cancelJob(jobID, UpgradeMode.LAST_STATE, configuration));
+ }
+
+ private FlinkService createFlinkService(ClusterClient<String> clusterClient) {
+ return new FlinkService((NamespacedKubernetesClient) client) {
+ @Override
+ protected ClusterClient<String> getClusterClient(Configuration config) {
+ return clusterClient;
+ }
+ };
+ }
+
+ private Deployment createTestingDeployment() {
+ return new DeploymentBuilder()
+ .withNewMetadata()
+ .withName(CLUSTER_ID)
+ .withNamespace(TESTING_NAMESPACE)
+ .endMetadata()
+ .withNewSpec()
+ .endSpec()
+ .build();
+ }
+}