You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2022/08/31 17:02:28 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-29100] Relax upgrade checks to allow stateless restart when no stable spec is present

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d7c8cee [FLINK-29100] Relax upgrade checks to allow stateless restart when no stable spec is present
6d7c8cee is described below

commit 6d7c8ceebb9ee0b83eb7036c57003177b7f2fd82
Author: Thomas Weise <th...@apache.org>
AuthorDate: Tue Aug 30 21:17:33 2022 -0400

    [FLINK-29100] Relax upgrade checks to allow stateless restart when no stable spec is present
---
 .../deployment/ApplicationReconciler.java          | 41 +++++++++++++++++++---
 .../deployment/ApplicationReconcilerTest.java      | 38 ++++++++++++++++++++
 2 files changed, 75 insertions(+), 4 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 42aabaad..43520df3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -98,12 +99,19 @@ public class ApplicationReconciler
                                 .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
                 && FlinkUtils.isKubernetesHAActivated(deployConfig)
                 && FlinkUtils.isKubernetesHAActivated(observeConfig)
-                && flinkService.isHaMetadataAvailable(deployConfig)
                 && !flinkVersionChanged(
                         ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {
-            LOG.info(
-                    "Job is not running but HA metadata is available for last state restore, ready for upgrade");
-            return Optional.of(UpgradeMode.LAST_STATE);
+
+            if (!flinkService.isHaMetadataAvailable(deployConfig)) {
+                if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {
+                    // initial deployment failure, reset to allow for spec change to proceed
+                    return resetOnMissingStableSpec(deployment, deployConfig);
+                }
+            } else {
+                LOG.info(
+                        "Job is not running but HA metadata is available for last state restore, ready for upgrade");
+                return Optional.of(UpgradeMode.LAST_STATE);
+            }
         }
 
         if (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING
@@ -120,6 +128,31 @@ public class ApplicationReconciler
         return Optional.empty();
     }
 
+    private Optional<UpgradeMode> resetOnMissingStableSpec(
+            FlinkDeployment deployment, Configuration deployConfig) {
+        // initial deployment failure, reset to allow for spec change to proceed
+        flinkService.deleteClusterDeployment(
+                deployment.getMetadata(), deployment.getStatus(), false);
+        flinkService.waitForClusterShutdown(deployConfig);
+        if (!flinkService.isHaMetadataAvailable(deployConfig)) {
+            LOG.info(
+                    "Job never entered stable state. Clearing previous spec to reset for initial deploy");
+            // TODO: lastSpecWithMeta.f1.isFirstDeployment() is false
+            // ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment);
+            deployment.getStatus().getReconciliationStatus().setLastReconciledSpec(null);
+            // UPGRADING triggers immediate reconciliation
+            deployment
+                    .getStatus()
+                    .getReconciliationStatus()
+                    .setState(ReconciliationState.UPGRADING);
+            return Optional.empty();
+        } else {
+            // proceed with upgrade if deployment succeeded between check and delete
+            LOG.info("Found HA state after deployment deletion, falling back to stateful upgrade");
+            return Optional.of(UpgradeMode.LAST_STATE);
+        }
+    }
+
     @Override
     protected void deploy(
             FlinkDeployment relatedResource,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 73a707b0..e1f04ec5 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -147,6 +147,11 @@ public class ApplicationReconcilerTest {
 
         deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
         deployment.getSpec().setRestartNonce(100L);
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .setLastStableSpec(
+                        deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
         flinkService.setHaDataAvailable(false);
         deployment.getStatus().getJobStatus().setState("RECONCILING");
 
@@ -184,6 +189,39 @@ public class ApplicationReconcilerTest {
         assertEquals("finished_sp", runningJobs.get(0).f0);
     }
 
+    @ParameterizedTest
+    @EnumSource(UpgradeMode.class)
+    public void testUpgradeBeforeReachingStableSpec(UpgradeMode upgradeMode) throws Exception {
+        flinkService.setHaDataAvailable(false);
+
+        final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+
+        reconciler.reconcile(deployment, context);
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+
+        // Ready for spec changes, the reconciliation should be performed
+        final String newImage = "new-image-1";
+        deployment.getSpec().getJob().setUpgradeMode(upgradeMode);
+        deployment.getSpec().setImage(newImage);
+        reconciler.reconcile(deployment, context);
+        if (!UpgradeMode.STATELESS.equals(upgradeMode)) {
+            assertNull(deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
+            assertEquals(
+                    ReconciliationState.UPGRADING,
+                    deployment.getStatus().getReconciliationStatus().getState());
+            reconciler.reconcile(deployment, context);
+        }
+        assertEquals(
+                newImage,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getImage());
+    }
+
     @Test
     public void testUpgradeModeChangeFromSavepointToLastState() throws Exception {
         final String expectedSavepointPath = "savepoint_0";