You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/24 16:18:53 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26836] Add sanity check for state.savepoints.dir when using savepoint upgrade mode
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new b49d27f [FLINK-26836] Add sanity check for state.savepoints.dir when using savepoint upgrade mode
b49d27f is described below
commit b49d27fb3df97b6f9349aaaed402abd9e4da6ee5
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Mar 25 00:18:49 2022 +0800
[FLINK-26836] Add sanity check for state.savepoints.dir when using savepoint upgrade mode
---
.../validation/DefaultDeploymentValidator.java | 21 ++++++++++++--
.../controller/FlinkDeploymentControllerTest.java | 13 +++++++++
.../validation/DeploymentValidatorTest.java | 32 ++++++++++++++++++++++
3 files changed, 64 insertions(+), 2 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 8e7d2b3..297ede0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.validation;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -34,6 +35,7 @@ import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.StringUtils;
import java.util.Map;
import java.util.Optional;
@@ -140,12 +142,27 @@ public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
return Optional.of("Jar URI must be defined");
}
+ Configuration configuration = Configuration.fromMap(confMap);
if (job.getUpgradeMode() == UpgradeMode.LAST_STATE
- && !HighAvailabilityMode.isHighAvailabilityModeActivated(
- Configuration.fromMap(confMap))) {
+ && !HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
return Optional.of("Job could not be upgraded with last-state while HA disabled");
}
+ if (StringUtils.isNullOrWhitespaceOnly(
+ configuration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY))) {
+ if (job.getUpgradeMode() == UpgradeMode.SAVEPOINT) {
+ return Optional.of(
+ String.format(
+ "Job could not be upgraded with savepoint while config key[%s] is not set",
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+ } else if (job.getSavepointTriggerNonce() != null) {
+ return Optional.of(
+ String.format(
+ "Savepoint could not be manually triggered for the running job while config key[%s] is not set",
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+ }
+ }
+
return Optional.empty();
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index cc0bace..308c894 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
@@ -266,6 +267,12 @@ public class FlinkDeploymentControllerTest {
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
appCluster.getSpec().getJob().setInitialSavepointPath("s0");
+ appCluster
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+ "file:///flink-data/savepoints");
testController.reconcile(appCluster, TestUtils.createEmptyContext());
List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
@@ -395,6 +402,12 @@ public class FlinkDeploymentControllerTest {
appCluster = TestUtils.buildApplicationCluster();
appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+ appCluster
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+ "file:///flink-data/savepoints");
testUpgradeNotReadyCluster(appCluster, false);
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 7d885ec..9c71671 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.validation;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -38,6 +39,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import static org.junit.Assert.assertTrue;
@@ -71,6 +73,26 @@ public class DeploymentValidatorTest {
},
"Job could not be upgraded with last-state while HA disabled");
+ testError(
+ dep -> {
+ dep.getSpec().setFlinkConfiguration(new HashMap<>());
+ dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+ },
+ String.format(
+ "Job could not be upgraded with savepoint while config key[%s] is not set",
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+
+ testError(
+ dep -> {
+ dep.getSpec().setFlinkConfiguration(new HashMap<>());
+ dep.getSpec()
+ .getJob()
+ .setSavepointTriggerNonce(ThreadLocalRandom.current().nextLong());
+ },
+ String.format(
+ "Savepoint could not be manually triggered for the running job while config key[%s] is not set",
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+
// Test conf validation
testSuccess(
dep ->
@@ -164,6 +186,11 @@ public class DeploymentValidatorTest {
.getJob()
.setState(JobState.SUSPENDED);
+ dep.getSpec()
+ .getFlinkConfiguration()
+ .put(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+ "file:///flink-data/savepoints");
dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
});
@@ -182,6 +209,11 @@ public class DeploymentValidatorTest {
.getJob()
.setState(JobState.SUSPENDED);
+ dep.getSpec()
+ .getFlinkConfiguration()
+ .put(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+ "file:///flink-data/savepoints");
dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
},
"Cannot perform savepoint restore without a valid savepoint");