You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/01 16:15:56 UTC

[flink-kubernetes-operator] 03/03: [FLINK-30527] Validate flinkVersion change for suspended jobs

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 7f65de716cb7a2d6ac158cd01f1ee2923b634795
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 29 11:28:24 2022 +0100

    [FLINK-30527] Validate flinkVersion change for suspended jobs
---
 .../operator/validation/DefaultValidator.java      | 25 +++++++++--
 .../operator/validation/DefaultValidatorTest.java  | 49 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 4 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 05fdfaa8..91ab375e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -25,7 +25,6 @@ import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
-import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
@@ -79,7 +78,7 @@ public class DefaultValidator implements FlinkResourceValidator {
         }
         return firstPresent(
                 validateDeploymentName(deployment.getMetadata().getName()),
-                validateFlinkVersion(spec.getFlinkVersion()),
+                validateFlinkVersion(deployment),
                 validateFlinkDeploymentConfig(effectiveConfig),
                 validateIngress(
                         spec.getIngress(),
@@ -114,10 +113,28 @@ public class DefaultValidator implements FlinkResourceValidator {
         return Optional.empty();
     }
 
-    private Optional<String> validateFlinkVersion(FlinkVersion version) {
-        if (version == null) {
+    private Optional<String> validateFlinkVersion(FlinkDeployment deployment) {
+        var spec = deployment.getSpec();
+        if (spec.getFlinkVersion() == null) {
             return Optional.of("Flink Version must be defined.");
         }
+
+        var lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+        if (lastReconciledSpec != null
+                && lastReconciledSpec.getJob() != null
+                && spec.getJob() != null
+                && spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+            var lastJob = lastReconciledSpec.getJob();
+            if (lastJob.getState() == JobState.SUSPENDED
+                    && lastJob.getUpgradeMode() == UpgradeMode.LAST_STATE
+                    && lastReconciledSpec.getFlinkVersion() != spec.getFlinkVersion()) {
+                return Optional.of(
+                        "Changing flinkVersion after last-state suspend is not allowed. Restore your cluster with the current flinkVersion and perform the version upgrade afterwards.");
+            }
+        }
+
         return Optional.empty();
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index a8cb5fa5..7036618e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -44,8 +44,11 @@ import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptio
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 
+import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import javax.annotation.Nullable;
 
@@ -447,6 +450,52 @@ public class DefaultValidatorTest {
                 validatorWithDefaultConfig);
     }
 
+    @ParameterizedTest
+    @EnumSource(UpgradeMode.class)
+    public void testFlinkVersionChangeValidation(UpgradeMode toUpgradeMode) {
+        var lastStateVersionChange =
+                createFlinkVersionChange(UpgradeMode.LAST_STATE, toUpgradeMode, JobState.SUSPENDED);
+        if (toUpgradeMode == UpgradeMode.STATELESS) {
+            testSuccess(lastStateVersionChange);
+        } else {
+            testError(
+                    lastStateVersionChange,
+                    "Changing flinkVersion after last-state suspend is not allowed.");
+        }
+
+        // Make sure validation always succeeds for running jobs
+        for (UpgradeMode fromUpgradeMode : UpgradeMode.values()) {
+            testSuccess(createFlinkVersionChange(fromUpgradeMode, toUpgradeMode, JobState.RUNNING));
+        }
+
+        // We should allow changing version after savepoint/stateless suspend
+        testSuccess(
+                createFlinkVersionChange(UpgradeMode.SAVEPOINT, toUpgradeMode, JobState.SUSPENDED));
+        testSuccess(
+                createFlinkVersionChange(UpgradeMode.STATELESS, toUpgradeMode, JobState.SUSPENDED));
+    }
+
+    @NotNull
+    private Consumer<FlinkDeployment> createFlinkVersionChange(
+            UpgradeMode fromUpgrade, UpgradeMode toUpgrade, JobState fromState) {
+        return dep -> {
+            var spec = dep.getSpec();
+            spec.setFlinkVersion(FlinkVersion.v1_15);
+            spec.getJob().setUpgradeMode(toUpgrade);
+
+            var suspendSpec = ReconciliationUtils.clone(spec);
+
+            // Stopped with LAST_STATE mode with different Flink Version
+            suspendSpec.getJob().setUpgradeMode(fromUpgrade);
+            suspendSpec.getJob().setState(fromState);
+            suspendSpec.setFlinkVersion(FlinkVersion.v1_14);
+
+            dep.getStatus()
+                    .getReconciliationStatus()
+                    .serializeAndSetLastReconciledSpec(suspendSpec, dep);
+        };
+    }
+
     private void testSuccess(Consumer<FlinkDeployment> deploymentModifier) {
         testSuccess(deploymentModifier, validator);
     }