You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/02/25 08:01:08 UTC

[flink-kubernetes-operator] branch main updated (85d997e -> baaed88)

This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git.


    from 85d997e  [FLINK-26136] Extract shared deployment validation logic
     new 2e6e978  [FLINK-26141] Support last-state upgrade mode
     new baaed88  [FLINK-26141] Add e2e test to guard last state upgrade

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci.yml                           |   5 +-
 e2e-tests/data/cr.yaml                             |   1 +
 ...pplication_ha.sh => test_last_state_upgrade.sh} |  30 ++--
 .../operator/reconciler/JobReconciler.java         |  57 +++----
 .../kubernetes/operator/service/FlinkService.java  |  15 +-
 .../kubernetes/operator/utils/FlinkUtils.java      |  12 +-
 .../kubernetes/operator/TestingClusterClient.java  | 149 ++++++++++++++++++
 .../operator/reconciler/JobReconcilerTest.java     |  80 ++++++++--
 .../operator/service/FlinkServiceTest.java         | 175 +++++++++++++++++++++
 9 files changed, 467 insertions(+), 57 deletions(-)
 copy e2e-tests/{test_kubernetes_application_ha.sh => test_last_state_upgrade.sh} (66%)
 create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
 create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java

[flink-kubernetes-operator] 01/02: [FLINK-26141] Support last-state upgrade mode

Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 2e6e978abd499d1b534e069b1d296f33d4300be9
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Thu Feb 24 15:47:41 2022 +0800

    [FLINK-26141] Support last-state upgrade mode
---
 .../operator/reconciler/JobReconciler.java         |  57 +++----
 .../kubernetes/operator/service/FlinkService.java  |  15 +-
 .../kubernetes/operator/utils/FlinkUtils.java      |  12 +-
 .../kubernetes/operator/TestingClusterClient.java  | 149 ++++++++++++++++++
 .../operator/reconciler/JobReconcilerTest.java     |  80 ++++++++--
 .../operator/service/FlinkServiceTest.java         | 175 +++++++++++++++++++++
 6 files changed, 444 insertions(+), 44 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index dee22fb..fb599e8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -25,7 +25,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -80,23 +79,21 @@ public class JobReconciler {
                     upgradeFlinkJob(flinkApp, effectiveConfig);
                 }
                 if (desiredJobState.equals(JobState.SUSPENDED)) {
-                    if (upgradeMode == UpgradeMode.STATELESS) {
-                        cancelJob(flinkApp, effectiveConfig);
-                    } else {
-                        suspendJob(flinkApp, effectiveConfig);
-                    }
+                    printCancelLogs(upgradeMode, flinkApp.getMetadata().getName());
+                    cancelJob(flinkApp, upgradeMode, effectiveConfig);
                 }
             }
-            if (currentJobState == JobState.SUSPENDED) {
-                if (desiredJobState == JobState.RUNNING) {
-                    if (upgradeMode == UpgradeMode.STATELESS) {
-                        deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
-                    } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
-                        restoreFromLastSavepoint(flinkApp, effectiveConfig);
-                    } else {
-                        throw new InvalidDeploymentException(
-                                "Only savepoint and stateless strategies are supported at the moment.");
-                    }
+            if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
+                if (upgradeMode == UpgradeMode.STATELESS) {
+                    deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+                } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
+                    restoreFromLastSavepoint(flinkApp, effectiveConfig);
+                } else if (upgradeMode == UpgradeMode.LAST_STATE) {
+                    final String savepointLocation =
+                            flinkApp.getStatus().getJobStatus().getSavepointLocation();
+                    // Upgrade mode changes from savepoint -> last-state
+                    deployFlinkJob(
+                            flinkApp, effectiveConfig, Optional.ofNullable(savepointLocation));
                 }
             }
         }
@@ -116,7 +113,8 @@ public class JobReconciler {
     private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
             throws Exception {
         LOG.info("Upgrading running job");
-        Optional<String> savepoint = cancelJob(flinkApp, effectiveConfig);
+        final Optional<String> savepoint =
+                cancelJob(flinkApp, flinkApp.getSpec().getJob().getUpgradeMode(), effectiveConfig);
         deployFlinkJob(flinkApp, effectiveConfig, savepoint);
     }
 
@@ -126,17 +124,20 @@ public class JobReconciler {
         deployFlinkJob(flinkApp, effectiveConfig, Optional.of(jobStatus.getSavepointLocation()));
     }
 
-    private Optional<String> suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
-        LOG.info("Suspending {}", flinkApp.getMetadata().getName());
-        return cancelJob(flinkApp, UpgradeMode.SAVEPOINT, effectiveConfig);
-    }
-
-    private Optional<String> cancelJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
-        LOG.info("Cancelling {}", flinkApp.getMetadata().getName());
-        UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
-        return cancelJob(flinkApp, upgradeMode, effectiveConfig);
+    private void printCancelLogs(UpgradeMode upgradeMode, String name) {
+        switch (upgradeMode) {
+            case STATELESS:
+                LOG.info("Cancelling {}", name);
+                break;
+            case SAVEPOINT:
+                LOG.info("Suspending {}", name);
+                break;
+            case LAST_STATE:
+                LOG.info("Cancelling {} with last state retained", name);
+                break;
+            default:
+                throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
+        }
     }
 
     private Optional<String> cancelJob(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index c8edf01..8b9af3b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.service;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.cli.ApplicationDeployer;
 import org.apache.flink.client.deployment.ClusterClientFactory;
@@ -34,9 +35,11 @@ import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import org.slf4j.Logger;
@@ -97,7 +100,8 @@ public class FlinkService {
         }
     }
 
-    private ClusterClient<String> getClusterClient(Configuration config) throws Exception {
+    @VisibleForTesting
+    protected ClusterClient<String> getClusterClient(Configuration config) throws Exception {
         final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
         final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
         final int port = config.getInteger(RestOptions.PORT);
@@ -125,6 +129,15 @@ public class FlinkService {
                                     .get(1, TimeUnit.MINUTES);
                     savepointOpt = Optional.of(savepoint);
                     break;
+                case LAST_STATE:
+                    if (!HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
+                        throw new InvalidDeploymentException(
+                                "Job could not be upgraded with last-state while HA disabled");
+                    }
+                    final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
+                    final String clusterId = clusterClient.getClusterId();
+                    FlinkUtils.deleteCluster(namespace, clusterId, kubernetesClient);
+                    break;
                 default:
                     throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
             }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index fd8061a..8a0ef5c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -109,11 +109,19 @@ public class FlinkUtils {
     }
 
     public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+        deleteCluster(
+                flinkApp.getMetadata().getNamespace(),
+                flinkApp.getMetadata().getName(),
+                kubernetesClient);
+    }
+
+    public static void deleteCluster(
+            String namespace, String clusterId, KubernetesClient kubernetesClient) {
         kubernetesClient
                 .apps()
                 .deployments()
-                .inNamespace(flinkApp.getMetadata().getNamespace())
-                .withName(flinkApp.getMetadata().getName())
+                .inNamespace(namespace)
+                .withName(clusterId)
                 .cascading(true)
                 .delete();
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
new file mode 100644
index 0000000..36cd663
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.util.function.TriFunction;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/** Testing ClusterClient used implementation. */
+public class TestingClusterClient<T> implements ClusterClient<T> {
+
+    private Function<JobID, CompletableFuture<Acknowledge>> cancelFunction =
+            ignore -> CompletableFuture.completedFuture(Acknowledge.get());
+    private TriFunction<JobID, Boolean, String, CompletableFuture<String>>
+            stopWithSavepointFunction =
+                    (ignore1, ignore2, savepointPath) ->
+                            CompletableFuture.completedFuture(savepointPath);
+
+    private final T clusterId;
+
+    public TestingClusterClient(T clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public void setCancelFunction(Function<JobID, CompletableFuture<Acknowledge>> cancelFunction) {
+        this.cancelFunction = cancelFunction;
+    }
+
+    public void setStopWithSavepointFunction(
+            TriFunction<JobID, Boolean, String, CompletableFuture<String>>
+                    stopWithSavepointFunction) {
+        this.stopWithSavepointFunction = stopWithSavepointFunction;
+    }
+
+    @Override
+    public T getClusterId() {
+        return clusterId;
+    }
+
+    @Override
+    public Configuration getFlinkConfiguration() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void shutDownCluster() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getWebInterfaceURL() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> cancel(JobID jobId) {
+        return cancelFunction.apply(jobId);
+    }
+
+    @Override
+    public CompletableFuture<String> cancelWithSavepoint(
+            JobID jobId, @Nullable String savepointDirectory) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            JobID jobId, boolean advanceToEndOfEventTime, @Nullable String savepointDirectory) {
+        return stopWithSavepointFunction.apply(jobId, advanceToEndOfEventTime, savepointDirectory);
+    }
+
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            JobID jobId, @Nullable String savepointDirectory) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+            JobID jobId, OperatorID operatorId, CoordinationRequest request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() {}
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index a7f7f36..8f99806 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -22,10 +22,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import org.junit.jupiter.api.Test;
 
@@ -33,6 +35,7 @@ import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** @link JobStatusObserver unit tests */
 public class JobReconcilerTest {
@@ -47,19 +50,7 @@ public class JobReconcilerTest {
 
         reconciler.reconcile("test", deployment, config);
         List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
-        assertEquals(1, runningJobs.size());
-        assertNull(runningJobs.get(0).f0);
-        deployment
-                .getStatus()
-                .getReconciliationStatus()
-                .setLastReconciledSpec(deployment.getSpec());
-
-        JobStatus jobStatus = new JobStatus();
-        jobStatus.setJobName(runningJobs.get(0).f1.getJobName());
-        jobStatus.setJobId(runningJobs.get(0).f1.getJobId().toHexString());
-        jobStatus.setState("RUNNING");
-
-        deployment.getStatus().setJobStatus(jobStatus);
+        verifyAndSetRunningJobsToStatus(deployment, runningJobs);
 
         // Test stateless upgrade
         FlinkDeployment statelessUpgrade = TestUtils.clone(deployment);
@@ -87,4 +78,67 @@ public class JobReconcilerTest {
         assertEquals(1, runningJobs.size());
         assertEquals("savepoint_0", runningJobs.get(0).f0);
     }
+
+    @Test
+    public void testUpgradeModeChangeFromSavepointToLastState() throws Exception {
+        final String expectedSavepointPath = "savepoint_0";
+        final TestingFlinkService flinkService = new TestingFlinkService();
+
+        final JobReconciler reconciler = new JobReconciler(null, flinkService);
+        final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        final Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
+
+        reconciler.reconcile("test", deployment, config);
+        List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
+        verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+
+        // Suspend FlinkDeployment with savepoint upgrade mode
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+        deployment.getSpec().getJob().setState(JobState.SUSPENDED);
+        deployment.getSpec().setImage("new-image-1");
+
+        reconciler.reconcile("test", deployment, config);
+        assertEquals(0, flinkService.listJobs().size());
+        assertTrue(
+                JobState.SUSPENDED
+                        .name()
+                        .equalsIgnoreCase(deployment.getStatus().getJobStatus().getState()));
+        assertEquals(
+                expectedSavepointPath,
+                deployment.getStatus().getJobStatus().getSavepointLocation());
+
+        // Resume FlinkDeployment with last-state upgrade mode
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .getLastReconciledSpec()
+                .getJob()
+                .setState(JobState.SUSPENDED);
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        deployment.getSpec().getJob().setState(JobState.RUNNING);
+        deployment.getSpec().setImage("new-image-2");
+
+        reconciler.reconcile("test", deployment, config);
+        runningJobs = flinkService.listJobs();
+        assertEquals(expectedSavepointPath, config.get(SavepointConfigOptions.SAVEPOINT_PATH));
+        assertEquals(1, runningJobs.size());
+        assertEquals(expectedSavepointPath, runningJobs.get(0).f0);
+    }
+
+    private void verifyAndSetRunningJobsToStatus(
+            FlinkDeployment deployment, List<Tuple2<String, JobStatusMessage>> runningJobs) {
+        assertEquals(1, runningJobs.size());
+        assertNull(runningJobs.get(0).f0);
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .setLastReconciledSpec(TestUtils.clone(deployment.getSpec()));
+
+        JobStatus jobStatus = new JobStatus();
+        jobStatus.setJobName(runningJobs.get(0).f1.getJobName());
+        jobStatus.setJobId(runningJobs.get(0).f1.getJobId().toHexString());
+        jobStatus.setState("RUNNING");
+
+        deployment.getStatus().setJobStatus(jobStatus);
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
new file mode 100644
index 0000000..594aeea
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import org.apache.flink.kubernetes.operator.TestingClusterClient;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link FlinkService unit tests */
+@EnableKubernetesMockClient(crud = true)
+public class FlinkServiceTest {
+    KubernetesClient client;
+    private final Configuration configuration = new Configuration();
+    private static final String CLUSTER_ID = "testing-flink-cluster";
+    private static final String TESTING_NAMESPACE = "test";
+
+    @BeforeEach
+    public void setup() {
+        configuration.set(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
+        configuration.set(KubernetesConfigOptions.NAMESPACE, TESTING_NAMESPACE);
+    }
+
+    @Test
+    public void testCancelJobWithStatelessUpgradeMode() throws Exception {
+        final TestingClusterClient<String> testingClusterClient =
+                new TestingClusterClient<>(CLUSTER_ID);
+        final CompletableFuture<JobID> cancelFuture = new CompletableFuture<>();
+        testingClusterClient.setCancelFunction(
+                jobID -> {
+                    cancelFuture.complete(jobID);
+                    return CompletableFuture.completedFuture(Acknowledge.get());
+                });
+
+        final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+        final JobID jobID = JobID.generate();
+        Optional<String> result =
+                flinkService.cancelJob(jobID, UpgradeMode.STATELESS, configuration);
+        assertTrue(cancelFuture.isDone());
+        assertEquals(jobID, cancelFuture.get());
+        assertFalse(result.isPresent());
+    }
+
+    @Test
+    public void testCancelJobWithSavepointUpgradeMode() throws Exception {
+        final TestingClusterClient<String> testingClusterClient =
+                new TestingClusterClient<>(CLUSTER_ID);
+        final CompletableFuture<Tuple3<JobID, Boolean, String>> stopWithSavepointFuture =
+                new CompletableFuture<>();
+        final String savepointPath = "file:///path/of/svp-1";
+        testingClusterClient.setStopWithSavepointFunction(
+                (jobID, advanceToEndOfEventTime, savepointDir) -> {
+                    stopWithSavepointFuture.complete(
+                            new Tuple3<>(jobID, advanceToEndOfEventTime, savepointDir));
+                    return CompletableFuture.completedFuture(savepointPath);
+                });
+
+        final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+        final JobID jobID = JobID.generate();
+        Optional<String> result =
+                flinkService.cancelJob(jobID, UpgradeMode.SAVEPOINT, configuration);
+        assertTrue(stopWithSavepointFuture.isDone());
+        assertEquals(jobID, stopWithSavepointFuture.get().f0);
+        assertFalse(stopWithSavepointFuture.get().f1);
+        assertNull(stopWithSavepointFuture.get().f2);
+        assertTrue(result.isPresent());
+        assertEquals(savepointPath, result.get());
+    }
+
+    @Test
+    public void testCancelJobWithLastStateUpgradeMode() throws Exception {
+        configuration.set(
+                HighAvailabilityOptions.HA_MODE,
+                KubernetesHaServicesFactory.class.getCanonicalName());
+        final TestingClusterClient<String> testingClusterClient =
+                new TestingClusterClient<>(CLUSTER_ID);
+        final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+        client.apps()
+                .deployments()
+                .inNamespace(TESTING_NAMESPACE)
+                .create(createTestingDeployment());
+        assertNotNull(
+                client.apps()
+                        .deployments()
+                        .inNamespace(TESTING_NAMESPACE)
+                        .withName(CLUSTER_ID)
+                        .get());
+        final JobID jobID = JobID.generate();
+        Optional<String> result =
+                flinkService.cancelJob(jobID, UpgradeMode.LAST_STATE, configuration);
+        assertFalse(result.isPresent());
+        assertNull(
+                client.apps()
+                        .deployments()
+                        .inNamespace(TESTING_NAMESPACE)
+                        .withName(CLUSTER_ID)
+                        .get());
+    }
+
+    @Test
+    public void testCancelJobWithLastStateUpgradeModeWhenHADisabled() {
+        configuration.set(HighAvailabilityOptions.HA_MODE, "None");
+        final TestingClusterClient<String> testingClusterClient =
+                new TestingClusterClient<>(CLUSTER_ID);
+        final FlinkService flinkService = createFlinkService(testingClusterClient);
+
+        final JobID jobID = JobID.generate();
+        assertThrows(
+                InvalidDeploymentException.class,
+                () -> flinkService.cancelJob(jobID, UpgradeMode.LAST_STATE, configuration));
+    }
+
+    private FlinkService createFlinkService(ClusterClient<String> clusterClient) {
+        return new FlinkService((NamespacedKubernetesClient) client) {
+            @Override
+            protected ClusterClient<String> getClusterClient(Configuration config) {
+                return clusterClient;
+            }
+        };
+    }
+
+    private Deployment createTestingDeployment() {
+        return new DeploymentBuilder()
+                .withNewMetadata()
+                .withName(CLUSTER_ID)
+                .withNamespace(TESTING_NAMESPACE)
+                .endMetadata()
+                .withNewSpec()
+                .endSpec()
+                .build();
+    }
+}

[flink-kubernetes-operator] 02/02: [FLINK-26141] Add e2e test to guard last state upgrade

Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit baaed88b21a27df0eb26a5bdce4516ec3c7510c1
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Thu Feb 24 16:32:05 2022 +0800

    [FLINK-26141] Add e2e test to guard last state upgrade
    
    This closes #22.
---
 .github/workflows/ci.yml             |  5 ++-
 e2e-tests/data/cr.yaml               |  1 +
 e2e-tests/test_last_state_upgrade.sh | 68 ++++++++++++++++++++++++++++++++++++
 3 files changed, 73 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 343142a..6f3345a 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -106,7 +106,10 @@ jobs:
           kubectl get pods
       - name: Run Flink e2e tests
         run: |
-          ./e2e-tests/test_kubernetes_application_ha.sh
+          ls e2e-tests/test_*.sh | while read script_test;do \
+            echo "Running $script_test"
+            bash $script_test || exit 1
+          done
       - name: Stop the operator
         run: |
           helm uninstall flink-operator
diff --git a/e2e-tests/data/cr.yaml b/e2e-tests/data/cr.yaml
index 936a1de..90069f9 100644
--- a/e2e-tests/data/cr.yaml
+++ b/e2e-tests/data/cr.yaml
@@ -78,6 +78,7 @@ spec:
     jarURI: local:///opt/flink/usrlib/myjob.jar
     entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
     parallelism: 2
+    upgradeMode: last-state
 
 ---
 apiVersion: v1
diff --git a/e2e-tests/test_last_state_upgrade.sh b/e2e-tests/test_last_state_upgrade.sh
new file mode 100755
index 0000000..20ca7d0
--- /dev/null
+++ b/e2e-tests/test_last_state_upgrade.sh
@@ -0,0 +1,68 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/utils.sh
+
+CLUSTER_ID="flink-example-statemachine"
+TIMEOUT=300
+
+function cleanup_and_exit() {
+    if [ $TRAPPED_EXIT_CODE != 0 ];then
+      debug_and_show_logs
+    fi
+
+    kubectl delete -f e2e-tests/data/cr.yaml
+    kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}"
+    kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability"
+}
+
+function wait_for_jobmanager_running() {
+    retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1
+
+    kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1
+    jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')
+
+    echo "Waiting for jobmanager pod ${jm_pod_name} ready."
+    kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1
+
+    wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1
+}
+
+on_exit cleanup_and_exit
+
+retry_times 5 30 "kubectl apply -f e2e-tests/data/cr.yaml" || exit 1
+
+wait_for_jobmanager_running
+
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
+
+job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}')
+
+# Update the FlinkDeployment and trigger the last state upgrade
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"jobManager": {"resource": {"cpu": 0.51, "memory": "1024m"} } } }'
+
+kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}"
+wait_for_jobmanager_running
+
+# Check the new JobManager recovering from latest successful checkpoint
+wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint" ${TIMEOUT} || exit 1
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
+
+echo "Successfully run the last-state upgrade test"
+