You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/05 07:49:46 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28348] Add configurable flag to disable last-state fallback for savepoint upgrade

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 17723ff  [FLINK-28348] Add configurable flag to disable last-state fallback for savepoint upgrade
17723ff is described below

commit 17723ffc09b6813006044f50165bb6cf15427df7
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Mon Jul 4 19:42:26 2022 +0200

    [FLINK-28348] Add configurable flag to disable last-state fallback for savepoint upgrade
---
 .../kubernetes_operator_config_configuration.html  |  6 +++++
 .../config/KubernetesOperatorConfigOptions.java    |  7 +++++
 .../deployment/ApplicationReconciler.java          |  6 ++++-
 .../controller/FlinkDeploymentControllerTest.java  | 31 +++++++++++++++++++++-
 4 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index ec5edc6..cfc1ba9 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -74,6 +74,12 @@
             <td>Boolean</td>
             <td>Whether to ignore pending savepoint during job upgrade.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.upgrade.last-state-fallback.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.observer.progress-check.interval</h5></td>
             <td style="word-wrap: break-word;">10 s</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index a9f253b..aa6031c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -230,4 +230,11 @@ public class KubernetesOperatorConfigOptions {
                     .defaultValue(10)
                     .withDescription(
                             "Max attempts of automatic reconcile retries on recoverable errors.");
+
+    public static final ConfigOption<Boolean> OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED =
+            ConfigOptions.key("kubernetes.operator.job.upgrade.last-state-fallback.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled.");
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 688d2d6..2747349 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
@@ -85,7 +86,10 @@ public class ApplicationReconciler
             return availableUpgradeMode;
         }
 
-        if (FlinkUtils.isKubernetesHAActivated(deployConfig)
+        if (deployConfig.getBoolean(
+                        KubernetesOperatorConfigOptions
+                                .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
+                && FlinkUtils.isKubernetesHAActivated(deployConfig)
                 && FlinkUtils.isKubernetesHAActivated(observeConfig)
                 && flinkService.isHaMetadataAvailable(deployConfig)) {
             LOG.info(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index c809418..788b770 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -66,6 +66,7 @@ import java.util.Optional;
 import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -562,7 +563,7 @@ public class FlinkDeploymentControllerTest {
             FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws Exception {
         var appCluster = TestUtils.buildApplicationCluster(flinkVersion);
         appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
-        testUpgradeNotReadyCluster(appCluster);
+        testUpgradeNotReadyCluster(ReconciliationUtils.clone(appCluster));
     }
 
     @Test
@@ -742,6 +743,34 @@ public class FlinkDeploymentControllerTest {
         testController.reconcile(appCluster, context);
         testController.reconcile(appCluster, context);
 
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RUNNING.name(),
+                appCluster.getStatus().getJobStatus().getState());
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // triggering upgrade with no last-state fallback on non-healthy app
+        flinkService.setPortReady(false);
+        appCluster
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED.key(), "false");
+        appCluster.getSpec().setServiceAccount(appCluster.getSpec().getServiceAccount() + "-5");
+        // not upgrading the cluster with no last-state fallback
+        testController.reconcile(appCluster, context);
+        assertNotEquals(
+                appCluster.getSpec(),
+                appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
+
+        // once the job is ready however the upgrade continues
+        flinkService.setPortReady(true);
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());