You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/01 16:15:35 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-30528] Reconcile other changes if upgrade is not available

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new a1842d4c [FLINK-30528] Reconcile other changes if upgrade is not available
a1842d4c is described below

commit a1842d4c0170feb008293963ec51c0343f42771d
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 29 15:46:05 2022 +0100

    [FLINK-30528] Reconcile other changes if upgrade is not available
---
 .../AbstractFlinkResourceReconciler.java           | 20 +++++----
 .../deployment/AbstractJobReconciler.java          |  5 ++-
 .../deployment/ApplicationReconciler.java          |  7 ++--
 .../reconciler/deployment/SessionReconciler.java   |  3 +-
 .../deployment/ApplicationReconcilerTest.java      | 47 ++++++++++++++++++++++
 5 files changed, 69 insertions(+), 13 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 411b58f6..0ec8d9ed 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -145,9 +145,7 @@ public abstract class AbstractFlinkResourceReconciler<
                         || reconciliationStatus.getState() == ReconciliationState.UPGRADING;
 
         var observeConfig = getObserveConfig(cr, ctx);
-
         if (specChanged) {
-
             if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
                 return;
             }
@@ -162,11 +160,18 @@ public abstract class AbstractFlinkResourceReconciler<
                         EventRecorder.Component.JobManagerDeployment,
                         specChangeMessage);
             }
-            boolean scale = scaleCluster(cr, ctx, observeConfig, deployConfig, specDiff.getType());
-            if (!scale) {
-                reconcileSpecChange(cr, ctx, observeConfig, deployConfig, specDiff.getType());
+            boolean reconciled =
+                    scaleCluster(cr, ctx, observeConfig, deployConfig, specDiff.getType())
+                            || reconcileSpecChange(
+                                    cr, ctx, observeConfig, deployConfig, specDiff.getType());
+            if (reconciled) {
+                // If we executed a scale or spec upgrade action we return, otherwise we continue to
+                // reconcile other changes
+                return;
             }
-        } else if (shouldRollBack(cr, observeConfig, flinkService)) {
+        }
+
+        if (shouldRollBack(cr, observeConfig, flinkService)) {
             // Rollbacks are executed in two steps, we initiate it first then return
             if (initiateRollBack(status)) {
                 return;
@@ -255,8 +260,9 @@ public abstract class AbstractFlinkResourceReconciler<
      * @param observeConfig Observe configuration.
      * @param deployConfig Deployment configuration.
      * @throws Exception Error during spec upgrade.
+     * @return True if spec change reconciliation was executed
      */
-    protected abstract void reconcileSpecChange(
+    protected abstract boolean reconcileSpecChange(
             CR cr,
             Context<?> ctx,
             Configuration observeConfig,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 5140f14a..efa803e9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -82,7 +82,7 @@ public abstract class AbstractJobReconciler<
     }
 
     @Override
-    protected void reconcileSpecChange(
+    protected boolean reconcileSpecChange(
             CR resource,
             Context<?> ctx,
             Configuration observeConfig,
@@ -104,7 +104,7 @@ public abstract class AbstractJobReconciler<
             Optional<UpgradeMode> availableUpgradeMode =
                     getAvailableUpgradeMode(resource, ctx, deployConfig, observeConfig);
             if (availableUpgradeMode.isEmpty()) {
-                return;
+                return false;
             }
 
             eventRecorder.triggerEvent(
@@ -145,6 +145,7 @@ public abstract class AbstractJobReconciler<
 
             ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
         }
+        return true;
     }
 
     protected Optional<UpgradeMode> getAvailableUpgradeMode(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 918ec0ec..95110dd3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -135,8 +135,9 @@ public class ApplicationReconciler
             return getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig);
         }
 
-        if (jmDeployStatus == JobManagerDeploymentStatus.MISSING
-                || jmDeployStatus == JobManagerDeploymentStatus.ERROR) {
+        if ((jmDeployStatus == JobManagerDeploymentStatus.MISSING
+                        || jmDeployStatus == JobManagerDeploymentStatus.ERROR)
+                && !flinkService.isHaMetadataAvailable(deployConfig)) {
             throw new RecoveryFailureException(
                     "JobManager deployment is missing and HA data is not available to make stateful upgrades. "
                             + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
@@ -145,7 +146,7 @@ public class ApplicationReconciler
         }
 
         LOG.info(
-                "Job is not running yet and HA metadata is not available, waiting for upgradeable state");
+                "Job is not running and HA metadata is not available or usable for executing the upgrade, waiting for upgradeable state");
         return Optional.empty();
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 24e7ecea..c0489292 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -91,7 +91,7 @@ public class SessionReconciler
     }
 
     @Override
-    protected void reconcileSpecChange(
+    protected boolean reconcileSpecChange(
             FlinkDeployment deployment,
             Context<?> ctx,
             Configuration observeConfig,
@@ -113,6 +113,7 @@ public class SessionReconciler
                 Optional.empty(),
                 false);
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, deployConfig);
+        return true;
     }
 
     private void deleteSessionCluster(FlinkDeployment deployment, Configuration effectiveConfig) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 813c4908..da30bd9a 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -432,6 +432,7 @@ public class ApplicationReconcilerTest {
                                 .toBuilder()
                                 .jobId(runningJobs.get(0).f1.getJobId().toHexString())
                                 .jobName(runningJobs.get(0).f1.getJobName())
+                                .updateTime(Long.toString(System.currentTimeMillis()))
                                 .state("RUNNING")
                                 .build());
         deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
@@ -706,4 +707,50 @@ public class ApplicationReconcilerTest {
         reconciler.reconcile(deployment, context);
         Assertions.assertEquals(MSG_RESTART_UNHEALTHY, eventCollector.events.remove().getMessage());
     }
+
+    @Test
+    public void testReconcileIfUpgradeModeNotAvailable() throws Exception {
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+
+        // We disable last state fallback as we want to test that the deployment is properly
+        // recovered before upgrade
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(
+                        KubernetesOperatorConfigOptions
+                                .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED
+                                .key(),
+                        "false");
+
+        // Initial deployment
+        reconciler.reconcile(deployment, context);
+
+        // Trigger upgrade but set jobmanager status to missing -> savepoint upgrade not available
+        deployment.getSpec().setRestartNonce(123L);
+        deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+        flinkService.clear();
+
+        reconciler.reconcile(deployment, context);
+        // We verify that deployment was recovered before upgrade
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+
+        var lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        assertNotEquals(
+                deployment.getSpec().getRestartNonce(), lastReconciledSpec.getRestartNonce());
+
+        // Set to running to let savepoint upgrade proceed
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+        reconciler.reconcile(deployment, context);
+        // Make sure upgrade is properly triggered now
+        lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        assertEquals(deployment.getSpec().getRestartNonce(), lastReconciledSpec.getRestartNonce());
+        assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
+    }
 }