You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/01 16:15:56 UTC
[flink-kubernetes-operator] 03/03: [FLINK-30527] Validate flinkVersion change for suspended jobs
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 7f65de716cb7a2d6ac158cd01f1ee2923b634795
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 29 11:28:24 2022 +0100
[FLINK-30527] Validate flinkVersion change for suspended jobs
---
.../operator/validation/DefaultValidator.java | 25 +++++++++--
.../operator/validation/DefaultValidatorTest.java | 49 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 4 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 05fdfaa8..91ab375e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -25,7 +25,6 @@ import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
-import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
@@ -79,7 +78,7 @@ public class DefaultValidator implements FlinkResourceValidator {
}
return firstPresent(
validateDeploymentName(deployment.getMetadata().getName()),
- validateFlinkVersion(spec.getFlinkVersion()),
+ validateFlinkVersion(deployment),
validateFlinkDeploymentConfig(effectiveConfig),
validateIngress(
spec.getIngress(),
@@ -114,10 +113,28 @@ public class DefaultValidator implements FlinkResourceValidator {
return Optional.empty();
}
- private Optional<String> validateFlinkVersion(FlinkVersion version) {
- if (version == null) {
+ private Optional<String> validateFlinkVersion(FlinkDeployment deployment) {
+ var spec = deployment.getSpec();
+ if (spec.getFlinkVersion() == null) {
return Optional.of("Flink Version must be defined.");
}
+
+ var lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+ if (lastReconciledSpec != null
+ && lastReconciledSpec.getJob() != null
+ && spec.getJob() != null
+ && spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+ var lastJob = lastReconciledSpec.getJob();
+ if (lastJob.getState() == JobState.SUSPENDED
+ && lastJob.getUpgradeMode() == UpgradeMode.LAST_STATE
+ && lastReconciledSpec.getFlinkVersion() != spec.getFlinkVersion()) {
+ return Optional.of(
+ "Changing flinkVersion after last-state suspend is not allowed. Restore your cluster with the current flinkVersion and perform the version upgrade afterwards.");
+ }
+ }
+
return Optional.empty();
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index a8cb5fa5..7036618e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -44,8 +44,11 @@ import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptio
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.utils.Constants;
+import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import javax.annotation.Nullable;
@@ -447,6 +450,52 @@ public class DefaultValidatorTest {
validatorWithDefaultConfig);
}
+ @ParameterizedTest
+ @EnumSource(UpgradeMode.class)
+ public void testFlinkVersionChangeValidation(UpgradeMode toUpgradeMode) {
+ var lastStateVersionChange =
+ createFlinkVersionChange(UpgradeMode.LAST_STATE, toUpgradeMode, JobState.SUSPENDED);
+ if (toUpgradeMode == UpgradeMode.STATELESS) {
+ testSuccess(lastStateVersionChange);
+ } else {
+ testError(
+ lastStateVersionChange,
+ "Changing flinkVersion after last-state suspend is not allowed.");
+ }
+
+ // Make sure validation always succeeds for running jobs
+ for (UpgradeMode fromUpgradeMode : UpgradeMode.values()) {
+ testSuccess(createFlinkVersionChange(fromUpgradeMode, toUpgradeMode, JobState.RUNNING));
+ }
+
+ // We should allow changing version after savepoint/stateless suspend
+ testSuccess(
+ createFlinkVersionChange(UpgradeMode.SAVEPOINT, toUpgradeMode, JobState.SUSPENDED));
+ testSuccess(
+ createFlinkVersionChange(UpgradeMode.STATELESS, toUpgradeMode, JobState.SUSPENDED));
+ }
+
+ @NotNull
+ private Consumer<FlinkDeployment> createFlinkVersionChange(
+ UpgradeMode fromUpgrade, UpgradeMode toUpgrade, JobState fromState) {
+ return dep -> {
+ var spec = dep.getSpec();
+ spec.setFlinkVersion(FlinkVersion.v1_15);
+ spec.getJob().setUpgradeMode(toUpgrade);
+
+ var suspendSpec = ReconciliationUtils.clone(spec);
+
+ // Stopped with LAST_STATE mode with different Flink Version
+ suspendSpec.getJob().setUpgradeMode(fromUpgrade);
+ suspendSpec.getJob().setState(fromState);
+ suspendSpec.setFlinkVersion(FlinkVersion.v1_14);
+
+ dep.getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(suspendSpec, dep);
+ };
+ }
+
private void testSuccess(Consumer<FlinkDeployment> deploymentModifier) {
testSuccess(deploymentModifier, validator);
}