You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/24 16:18:53 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26836] Add sanity check for state.savepoints.dir when using savepoint upgrade mode

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new b49d27f  [FLINK-26836] Add sanity check for state.savepoints.dir when using savepoint upgrade mode
b49d27f is described below

commit b49d27fb3df97b6f9349aaaed402abd9e4da6ee5
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Mar 25 00:18:49 2022 +0800

    [FLINK-26836] Add sanity check for state.savepoints.dir when using savepoint upgrade mode
---
 .../validation/DefaultDeploymentValidator.java     | 21 ++++++++++++--
 .../controller/FlinkDeploymentControllerTest.java  | 13 +++++++++
 .../validation/DeploymentValidatorTest.java        | 32 ++++++++++++++++++++++
 3 files changed, 64 insertions(+), 2 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 8e7d2b3..297ede0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -34,6 +35,7 @@ import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
 import java.util.Optional;
@@ -140,12 +142,27 @@ public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
             return Optional.of("Jar URI must be defined");
         }
 
+        Configuration configuration = Configuration.fromMap(confMap);
         if (job.getUpgradeMode() == UpgradeMode.LAST_STATE
-                && !HighAvailabilityMode.isHighAvailabilityModeActivated(
-                        Configuration.fromMap(confMap))) {
+                && !HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
             return Optional.of("Job could not be upgraded with last-state while HA disabled");
         }
 
+        if (StringUtils.isNullOrWhitespaceOnly(
+                configuration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY))) {
+            if (job.getUpgradeMode() == UpgradeMode.SAVEPOINT) {
+                return Optional.of(
+                        String.format(
+                                "Job could not be upgraded with savepoint while config key[%s] is not set",
+                                CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+            } else if (job.getSavepointTriggerNonce() != null) {
+                return Optional.of(
+                        String.format(
+                                "Savepoint could not be manually triggered for the running job while config key[%s] is not set",
+                                CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+            }
+        }
+
         return Optional.empty();
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index cc0bace..308c894 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
@@ -266,6 +267,12 @@ public class FlinkDeploymentControllerTest {
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
         appCluster.getSpec().getJob().setInitialSavepointPath("s0");
+        appCluster
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(
+                        CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+                        "file:///flink-data/savepoints");
 
         testController.reconcile(appCluster, TestUtils.createEmptyContext());
         List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
@@ -395,6 +402,12 @@ public class FlinkDeploymentControllerTest {
 
         appCluster = TestUtils.buildApplicationCluster();
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+        appCluster
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(
+                        CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+                        "file:///flink-data/savepoints");
         testUpgradeNotReadyCluster(appCluster, false);
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 7d885ec..9c71671 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -38,6 +39,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Consumer;
 
 import static org.junit.Assert.assertTrue;
@@ -71,6 +73,26 @@ public class DeploymentValidatorTest {
                 },
                 "Job could not be upgraded with last-state while HA disabled");
 
+        testError(
+                dep -> {
+                    dep.getSpec().setFlinkConfiguration(new HashMap<>());
+                    dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+                },
+                String.format(
+                        "Job could not be upgraded with savepoint while config key[%s] is not set",
+                        CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+
+        testError(
+                dep -> {
+                    dep.getSpec().setFlinkConfiguration(new HashMap<>());
+                    dep.getSpec()
+                            .getJob()
+                            .setSavepointTriggerNonce(ThreadLocalRandom.current().nextLong());
+                },
+                String.format(
+                        "Savepoint could not be manually triggered for the running job while config key[%s] is not set",
+                        CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+
         // Test conf validation
         testSuccess(
                 dep ->
@@ -164,6 +186,11 @@ public class DeploymentValidatorTest {
                             .getJob()
                             .setState(JobState.SUSPENDED);
 
+                    dep.getSpec()
+                            .getFlinkConfiguration()
+                            .put(
+                                    CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+                                    "file:///flink-data/savepoints");
                     dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
                 });
 
@@ -182,6 +209,11 @@ public class DeploymentValidatorTest {
                             .getJob()
                             .setState(JobState.SUSPENDED);
 
+                    dep.getSpec()
+                            .getFlinkConfiguration()
+                            .put(
+                                    CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+                                    "file:///flink-data/savepoints");
                     dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
                 },
                 "Cannot perform savepoint restore without a valid savepoint");