You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/12/28 16:10:32 UTC

[flink-kubernetes-operator] branch release-1.3 updated: [FLINK-30406] Detect when jobmanager never started

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/release-1.3 by this push:
     new 5ed117cb [FLINK-30406] Detect when jobmanager never started
5ed117cb is described below

commit 5ed117cb8e3e2b00f2abeb0f98d7555d78e0afe3
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Wed Dec 28 17:10:26 2022 +0100

    [FLINK-30406] Detect when jobmanager never started
---
 .../observer/deployment/SessionObserver.java       |   2 +-
 .../AbstractFlinkResourceReconciler.java           |  44 ++++-
 .../deployment/AbstractJobReconciler.java          |  17 +-
 .../deployment/ApplicationReconciler.java          |  46 ++---
 .../kubernetes/operator/utils/FlinkUtils.java      |  35 ++++
 .../flink/kubernetes/operator/TestUtils.java       |  17 +-
 .../operator/TestingApplicationReconciler.java     |  57 ++++++
 .../kubernetes/operator/TestingFlinkService.java   |   7 +-
 .../controller/FailedDeploymentRestartTest.java    |   3 +
 .../controller/FlinkDeploymentControllerTest.java  |  15 +-
 .../controller/UnhealthyDeploymentRestartTest.java |   3 -
 .../deployment/ApplicationReconcilerTest.java      |   4 +-
 .../ApplicationReconcilerUpgradeModeTest.java      | 218 +++++++++++++++++++--
 .../kubernetes/operator/utils/FlinkUtilsTest.java  |  43 ++++
 14 files changed, 450 insertions(+), 61 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index 41279e13..df4b61c8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -51,7 +51,7 @@ public class SessionObserver extends AbstractFlinkDeploymentObserver {
                 rs.markReconciledSpecAsStable();
             }
         } catch (Exception e) {
-            logger.error("REST service in session cluster is bad now", e);
+            logger.error("REST service in session cluster timed out", e);
             if (e instanceof TimeoutException) {
                 // check for problems with the underlying deployment
                 observeJmDeployment(deployment, context, observerContext.getDeployedConfig());
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index d1c2c743..29a724d5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -27,9 +27,12 @@ import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
+import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
@@ -93,7 +96,7 @@ public abstract class AbstractFlinkResourceReconciler<
     }
 
     @Override
-    public final void reconcile(CR cr, Context<?> ctx) throws Exception {
+    public void reconcile(CR cr, Context<?> ctx) throws Exception {
         var spec = cr.getSpec();
         var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
         var status = cr.getStatus();
@@ -109,12 +112,7 @@ public abstract class AbstractFlinkResourceReconciler<
         // No further logic is required at this point.
         if (reconciliationStatus.isBeforeFirstDeployment()) {
             LOG.info("Deploying for the first time");
-
-            // Before we try to submit the job we record the current spec in the status so we can
-            // handle subsequent deployment and status update errors
-            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
-            statusRecorder.patchAndCacheStatus(cr);
-
+            updateStatusBeforeFirstDeployment(cr, spec, deployConfig, status);
             deploy(
                     cr,
                     spec,
@@ -177,6 +175,36 @@ public abstract class AbstractFlinkResourceReconciler<
         }
     }
 
+    /**
+     * Update the status before the first deployment. We have to record the upgrade mode based on
+     * the initial savepoint path provided, and record the to-be-deployed spec in the status.
+     *
+     * @param cr Related flink resource
+     * @param spec Spec to be deployed
+     * @param deployConfig Deploy configuration
+     * @param status Resource status
+     */
+    private void updateStatusBeforeFirstDeployment(
+            CR cr, SPEC spec, Configuration deployConfig, STATUS status) {
+        if (spec.getJob() != null) {
+            var initialUpgradeMode = UpgradeMode.STATELESS;
+            var initialSp = spec.getJob().getInitialSavepointPath();
+
+            if (initialSp != null) {
+                status.getJobStatus()
+                        .getSavepointInfo()
+                        .setLastSavepoint(Savepoint.of(initialSp, SavepointTriggerType.UNKNOWN));
+                initialUpgradeMode = UpgradeMode.SAVEPOINT;
+            }
+
+            spec.getJob().setUpgradeMode(initialUpgradeMode);
+        }
+        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
+        // Before we try to submit the job we record the current spec in the status so we can
+        // handle subsequent deployment and status update errors
+        statusRecorder.patchAndCacheStatus(cr);
+    }
+
     /**
      * Get Flink configuration object for deploying the given spec using {@link #deploy}.
      *
@@ -249,7 +277,7 @@ public abstract class AbstractFlinkResourceReconciler<
             CR cr, Context<?> context, Configuration observeConfig) throws Exception;
 
     @Override
-    public final DeleteControl cleanup(CR resource, Context<?> context) {
+    public DeleteControl cleanup(CR resource, Context<?> context) {
         return cleanupInternal(resource, context);
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 370d96dd..3c3de7a7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -114,7 +114,7 @@ public abstract class AbstractJobReconciler<
                 LOG.info("Upgrading/Restarting running job, suspending first...");
             }
             Optional<UpgradeMode> availableUpgradeMode =
-                    getAvailableUpgradeMode(resource, deployConfig, observeConfig);
+                    getAvailableUpgradeMode(resource, ctx, deployConfig, observeConfig);
             if (availableUpgradeMode.isEmpty()) {
                 return;
             }
@@ -134,7 +134,14 @@ public abstract class AbstractJobReconciler<
                 ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
             }
         }
+
         if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
+            // We inherit the upgrade mode unless stateless upgrade requested
+            if (currentDeploySpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+                currentDeploySpec
+                        .getJob()
+                        .setUpgradeMode(lastReconciledSpec.getJob().getUpgradeMode());
+            }
             // We record the target spec into an upgrading state before deploying
             ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig);
             statusRecorder.patchAndCacheStatus(resource);
@@ -153,7 +160,7 @@ public abstract class AbstractJobReconciler<
     }
 
     protected Optional<UpgradeMode> getAvailableUpgradeMode(
-            CR resource, Configuration deployConfig, Configuration observeConfig) {
+            CR resource, Context<?> ctx, Configuration deployConfig, Configuration observeConfig) {
         var status = resource.getStatus();
         var upgradeMode = resource.getSpec().getJob().getUpgradeMode();
 
@@ -162,7 +169,9 @@ public abstract class AbstractJobReconciler<
             return Optional.of(UpgradeMode.STATELESS);
         }
 
-        if (ReconciliationUtils.isJobInTerminalState(status)) {
+        var flinkService = getFlinkService(resource, ctx);
+        if (ReconciliationUtils.isJobInTerminalState(status)
+                && !flinkService.isHaMetadataAvailable(observeConfig)) {
             LOG.info(
                     "Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint");
             return Optional.of(UpgradeMode.SAVEPOINT);
@@ -215,6 +224,7 @@ public abstract class AbstractJobReconciler<
             throws Exception {
         var reconciliationStatus = resource.getStatus().getReconciliationStatus();
         var rollbackSpec = reconciliationStatus.deserializeLastStableSpec();
+        rollbackSpec.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
 
         UpgradeMode upgradeMode = resource.getSpec().getJob().getUpgradeMode();
 
@@ -261,6 +271,7 @@ public abstract class AbstractJobReconciler<
             throws Exception {
         LOG.info("Resubmitting Flink job...");
         SPEC specToRecover = ReconciliationUtils.getDeployedSpec(deployment);
+        specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
         restoreJob(
                 deployment,
                 specToRecover,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index ee40da92..2acc61ec 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
@@ -92,11 +93,14 @@ public class ApplicationReconciler
 
     @Override
     protected Optional<UpgradeMode> getAvailableUpgradeMode(
-            FlinkDeployment deployment, Configuration deployConfig, Configuration observeConfig) {
+            FlinkDeployment deployment,
+            Context<?> ctx,
+            Configuration deployConfig,
+            Configuration observeConfig) {
 
         var status = deployment.getStatus();
         var availableUpgradeMode =
-                super.getAvailableUpgradeMode(deployment, deployConfig, observeConfig);
+                super.getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig);
 
         if (availableUpgradeMode.isPresent()) {
             return availableUpgradeMode;
@@ -110,20 +114,27 @@ public class ApplicationReconciler
                 && !flinkVersionChanged(
                         ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {
 
-            if (!flinkService.isHaMetadataAvailable(deployConfig)) {
-                if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {
-                    // initial deployment failure, reset to allow for spec change to proceed
-                    return resetOnMissingStableSpec(deployment, deployConfig);
-                }
-            } else {
+            if (flinkService.isHaMetadataAvailable(deployConfig)) {
                 LOG.info(
                         "Job is not running but HA metadata is available for last state restore, ready for upgrade");
                 return Optional.of(UpgradeMode.LAST_STATE);
             }
         }
 
-        if (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING
-                || status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.ERROR) {
+        var jmDeployStatus = status.getJobManagerDeploymentStatus();
+        if (jmDeployStatus != JobManagerDeploymentStatus.MISSING
+                && status.getReconciliationStatus()
+                                .deserializeLastReconciledSpec()
+                                .getJob()
+                                .getUpgradeMode()
+                        != UpgradeMode.LAST_STATE
+                && FlinkUtils.jmPodNeverStarted(ctx)) {
+            deleteJmThatNeverStarted(deployment, deployConfig);
+            return getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig);
+        }
+
+        if (jmDeployStatus == JobManagerDeploymentStatus.MISSING
+                || jmDeployStatus == JobManagerDeploymentStatus.ERROR) {
             throw new RecoveryFailureException(
                     "JobManager deployment is missing and HA data is not available to make stateful upgrades. "
                             + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
@@ -136,21 +147,12 @@ public class ApplicationReconciler
         return Optional.empty();
     }
 
-    private Optional<UpgradeMode> resetOnMissingStableSpec(
-            FlinkDeployment deployment, Configuration deployConfig) {
-        // initial deployment failure, reset to allow for spec change to proceed
+    private void deleteJmThatNeverStarted(FlinkDeployment deployment, Configuration deployConfig) {
+        deployment.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
         flinkService.deleteClusterDeployment(
                 deployment.getMetadata(), deployment.getStatus(), false);
         flinkService.waitForClusterShutdown(deployConfig);
-        if (!flinkService.isHaMetadataAvailable(deployConfig)) {
-            LOG.info("Job never entered stable state. Resetting status for initial deploy");
-            ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment);
-            return Optional.empty();
-        } else {
-            // proceed with upgrade if deployment succeeded between check and delete
-            LOG.info("Found HA state after deployment deletion, falling back to stateful upgrade");
-            return Optional.of(UpgradeMode.LAST_STATE);
-        }
+        LOG.info("Deleted jobmanager deployment that never started.");
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 221d0558..572845c1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -38,7 +38,10 @@ import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.ConfigMapList;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +49,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Optional;
 
 /** Flink Utility methods used by the operator. */
 public class FlinkUtils {
@@ -221,4 +225,35 @@ public class FlinkUtils {
         return new JobID(
                 Preconditions.checkNotNull(uid).hashCode(), Preconditions.checkNotNull(generation));
     }
+
+    /**
+     * Check if the jobmanager pod has never successfully started. This is an important check to
+     * determine whether it is possible that the job has started and taken any checkpoints that we
+     * are unaware of.
+     *
+     * <p>The way we check this is by using the availability condition transition timestamp. If the
+     * deployment never transitioned out of the unavailable state, we can assume that the JM never
+     * started.
+     *
+     * @param context Resource context
+     * @return True only if we are sure that the jobmanager pod never started
+     */
+    public static boolean jmPodNeverStarted(Context<?> context) {
+        Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class);
+        if (depOpt.isPresent()) {
+            Deployment deployment = depOpt.get();
+            for (DeploymentCondition condition : deployment.getStatus().getConditions()) {
+                if (condition.getType().equals("Available")) {
+                    var createTs = deployment.getMetadata().getCreationTimestamp();
+                    if ("False".equals(condition.getStatus())
+                            && createTs.equals(condition.getLastTransitionTime())) {
+                        return true;
+                    }
+                }
+            }
+        }
+
+        // If unsure, return false to be on the safe side
+        return false;
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index aac61d84..1d99f256 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -57,6 +57,7 @@ import java.net.HttpURLConnection;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -94,15 +95,27 @@ public class TestUtils extends BaseTestUtils {
     }
 
     public static Deployment createDeployment(boolean ready) {
-        DeploymentStatus status = new DeploymentStatus();
+        String nowTs = Instant.now().toString();
+        var status = new DeploymentStatus();
         status.setAvailableReplicas(ready ? 1 : 0);
         status.setReplicas(1);
+        var availableCondition = new DeploymentCondition();
+        availableCondition.setType("Available");
+        availableCondition.setStatus(ready ? "True" : "False");
+        availableCondition.setLastTransitionTime(nowTs);
+        status.setConditions(List.of(availableCondition));
+
         DeploymentSpec spec = new DeploymentSpec();
         spec.setReplicas(1);
+
+        var meta = new ObjectMeta();
+        meta.setCreationTimestamp(nowTs);
+
         Deployment deployment = new Deployment();
-        deployment.setMetadata(new ObjectMeta());
+        deployment.setMetadata(meta);
         deployment.setSpec(spec);
         deployment.setStatus(status);
+
         return deployment;
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingApplicationReconciler.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingApplicationReconciler.java
new file mode 100644
index 00000000..3bcc6283
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingApplicationReconciler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+
+/** Testing wrapper for {@link ApplicationReconciler}. */
+public class TestingApplicationReconciler extends ApplicationReconciler {
+    public TestingApplicationReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder,
+            StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder) {
+        super(kubernetesClient, flinkService, configManager, eventRecorder, statusRecorder);
+    }
+
+    @Override
+    public void reconcile(FlinkDeployment flinkDeployment, Context<?> context) throws Exception {
+        var cr = ReconciliationUtils.clone(flinkDeployment);
+        cr.setStatus(flinkDeployment.getStatus());
+        super.reconcile(cr, context);
+    }
+
+    @Override
+    public DeleteControl cleanup(FlinkDeployment flinkDeployment, Context<?> context) {
+        var cr = ReconciliationUtils.clone(flinkDeployment);
+        cr.setStatus(flinkDeployment.getStatus());
+        return super.cleanup(cr, context);
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index a62bd732..c37fd54f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -95,6 +95,7 @@ public class TestingFlinkService extends AbstractFlinkService {
     private final Set<String> sessions = new HashSet<>();
     private boolean isPortReady = true;
     private boolean haDataAvailable = true;
+    private boolean jobManagerReady = true;
     private boolean deployFailure = false;
     private Runnable sessionJobSubmittedCallback;
     private PodList podList = new PodList();
@@ -122,7 +123,7 @@ public class TestingFlinkService extends AbstractFlinkService {
                 if (jobs.isEmpty() && sessions.isEmpty()) {
                     return Optional.empty();
                 }
-                return (Optional<T>) Optional.of(TestUtils.createDeployment(true));
+                return (Optional<T>) Optional.of(TestUtils.createDeployment(jobManagerReady));
             }
         };
     }
@@ -193,6 +194,10 @@ public class TestingFlinkService extends AbstractFlinkService {
         this.haDataAvailable = haDataAvailable;
     }
 
+    public void setJobManagerReady(boolean jmReady) {
+        this.jobManagerReady = jmReady;
+    }
+
     public void setDeployFailure(boolean deployFailure) {
         this.deployFailure = deployFailure;
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
index 99a6ca37..f34d8040 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
@@ -96,6 +96,9 @@ public class FailedDeploymentRestartTest {
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+
+        // We started without savepoint
+        appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
         assertEquals(
                 appCluster.getSpec(),
                 appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 4ba5bb44..ebc1241a 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -845,8 +845,12 @@ public class FlinkDeploymentControllerTest {
     private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception {
         flinkService.clear();
         testController.reconcile(appCluster, context);
+        var specClone = ReconciliationUtils.clone(appCluster.getSpec());
+        if (specClone.getJob() != null) {
+            specClone.getJob().setUpgradeMode(UpgradeMode.STATELESS);
+        }
         assertEquals(
-                appCluster.getSpec(),
+                specClone,
                 appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
 
         flinkService.setPortReady(false);
@@ -864,8 +868,15 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        var expectedSpec = ReconciliationUtils.clone(appCluster.getSpec());
+        if (expectedSpec.getJob() != null
+                && expectedSpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+            expectedSpec.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        }
+
         assertEquals(
-                appCluster.getSpec(),
+                expectedSpec,
                 appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
 
         flinkService.setPortReady(true);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index 07dae7de..a4f67544 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -100,9 +100,6 @@ public class UnhealthyDeploymentRestartTest {
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
-        assertEquals(
-                appCluster.getSpec(),
-                appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
     }
 
     private static Stream<Arguments> applicationTestParams() {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 6c2f6ee2..5b7ac44c 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingApplicationReconciler;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -94,6 +95,7 @@ public class ApplicationReconcilerTest {
     private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
     private TestingFlinkService flinkService;
     private ApplicationReconciler reconciler;
+
     private Context<FlinkDeployment> context;
     private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder;
 
@@ -107,7 +109,7 @@ public class ApplicationReconcilerTest {
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         reconciler =
-                new ApplicationReconciler(
+                new TestingApplicationReconciler(
                         kubernetesClient,
                         flinkService,
                         configManager,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 2445741f..6cdd47fc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingApplicationReconciler;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -47,10 +48,14 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -75,7 +80,7 @@ public class ApplicationReconcilerUpgradeModeTest {
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         reconciler =
-                new ApplicationReconciler(
+                new TestingApplicationReconciler(
                         kubernetesClient,
                         flinkService,
                         configManager,
@@ -259,36 +264,213 @@ public class ApplicationReconcilerUpgradeModeTest {
     }
 
     @ParameterizedTest
-    @EnumSource(UpgradeMode.class)
-    public void testUpgradeBeforeReachingStableSpec(UpgradeMode upgradeMode) throws Exception {
+    @MethodSource("testUpgradeJmDeployCannotStartParams")
+    public void testUpgradeJmDeployCannotStart(UpgradeMode fromMode, UpgradeMode toMode)
+            throws Exception {
+
+        flinkService.setHaDataAvailable(true);
+        flinkService.setJobManagerReady(true);
+
+        // Prepare running deployment
+        var deployment = TestUtils.buildApplicationCluster();
+        var jobSpec = deployment.getSpec().getJob();
+        jobSpec.setUpgradeMode(fromMode);
+
+        reconciler.reconcile(deployment, context);
+        var runningJobs = flinkService.listJobs();
+        verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+
+        // Suspend running deployment and assert that correct upgradeMode is set
+        jobSpec.setState(JobState.SUSPENDED);
+        reconciler.reconcile(deployment, context);
+
+        var lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
+        assertEquals(fromMode, lastReconciledSpec.getJob().getUpgradeMode());
+
+        // Restore deployment and assert that correct upgradeMode is set
+        jobSpec.setState(JobState.RUNNING);
+        jobSpec.setUpgradeMode(toMode);
+        reconciler.reconcile(deployment, context);
+
+        lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        assertEquals(JobState.RUNNING, lastReconciledSpec.getJob().getState());
+        assertEquals(
+                toMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : fromMode,
+                lastReconciledSpec.getJob().getUpgradeMode());
+
+        // Simulate JM failure after deployment, we need this to test the actual upgrade behaviour
+        // with a jobmanager that never started
+        flinkService.setJobManagerReady(false);
         flinkService.setHaDataAvailable(false);
 
-        final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+
+        // Send in a new upgrade while the jobmanager still not started
+        jobSpec.setState(JobState.RUNNING);
+        jobSpec.setEntryClass("newClass");
+        reconciler.reconcile(deployment, context);
+        lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+        // Make sure the upgrade was executed as long as we have the savepoint information
+        if (fromMode == UpgradeMode.LAST_STATE && toMode != UpgradeMode.STATELESS) {
+            // We cant make progress as no HA meta available after LAST_STATE, upgrade. It means the
+            // job started and terminated, but we didn't see...
+            assertEquals(
+                    JobManagerDeploymentStatus.DEPLOYING,
+                    deployment.getStatus().getJobManagerDeploymentStatus());
+            assertEquals(JobState.RUNNING, lastReconciledSpec.getJob().getState());
+        } else {
+            assertEquals(
+                    JobManagerDeploymentStatus.MISSING,
+                    deployment.getStatus().getJobManagerDeploymentStatus());
+            assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
+            assertEquals(
+                    toMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : UpgradeMode.SAVEPOINT,
+                    lastReconciledSpec.getJob().getUpgradeMode());
+
+            // Complete upgrade and recover succesfully with the latest savepoint
+            reconciler.reconcile(deployment, context);
+            lastReconciledSpec =
+                    deployment
+                            .getStatus()
+                            .getReconciliationStatus()
+                            .deserializeLastReconciledSpec();
+
+            assertEquals(JobState.RUNNING, lastReconciledSpec.getJob().getState());
+            assertEquals(1, flinkService.listJobs().size());
+            if (fromMode == UpgradeMode.STATELESS || toMode == UpgradeMode.STATELESS) {
+                assertNull(flinkService.listJobs().get(0).f0);
+            } else {
+                assertEquals("savepoint_0", flinkService.listJobs().get(0).f0);
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("testInitialJmDeployCannotStartParams")
+    public void testInitialJmDeployCannotStart(UpgradeMode upgradeMode, boolean initSavepoint)
+            throws Exception {
+
+        // We simulate JM failure to test the initial submission/upgrade behavior when the JM can
+        // never start initially
+        flinkService.setHaDataAvailable(false);
+        flinkService.setJobManagerReady(false);
+
+        var deployment = TestUtils.buildApplicationCluster();
+        if (initSavepoint) {
+            deployment.getSpec().getJob().setInitialSavepointPath("init-sp");
+        }
 
         reconciler.reconcile(deployment, context);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING,
                 deployment.getStatus().getJobManagerDeploymentStatus());
 
-        // Ready for spec changes, the reconciliation should be performed
+        var lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+        // Make sure savepoint path is recorded in status and upgradeMode set correctly for initial
+        // startup. Either stateless or savepoint depending only on the initialSavepointPath
+        // setting.
+        if (initSavepoint) {
+            assertEquals("init-sp", flinkService.listJobs().get(0).f0);
+            assertEquals(
+                    "init-sp",
+                    deployment
+                            .getStatus()
+                            .getJobStatus()
+                            .getSavepointInfo()
+                            .getLastSavepoint()
+                            .getLocation());
+            assertEquals(UpgradeMode.SAVEPOINT, lastReconciledSpec.getJob().getUpgradeMode());
+        } else {
+            assertNull(flinkService.listJobs().get(0).f0);
+            assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
+            assertEquals(UpgradeMode.STATELESS, lastReconciledSpec.getJob().getUpgradeMode());
+        }
+
+        // JM is failed, but we submit an upgrade, this should always be possible on initial deploy
+        // failure
         final String newImage = "new-image-1";
         deployment.getSpec().getJob().setUpgradeMode(upgradeMode);
         deployment.getSpec().setImage(newImage);
         reconciler.reconcile(deployment, context);
-        if (!UpgradeMode.STATELESS.equals(upgradeMode)) {
-            assertNull(deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
-            assertEquals(
-                    ReconciliationState.UPGRADING,
-                    deployment.getStatus().getReconciliationStatus().getState());
-            reconciler.reconcile(deployment, context);
-        }
         assertEquals(
-                newImage,
-                deployment
-                        .getStatus()
-                        .getReconciliationStatus()
-                        .deserializeLastReconciledSpec()
-                        .getImage());
+                ReconciliationState.UPGRADING,
+                deployment.getStatus().getReconciliationStatus().getState());
+        lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+        // We make sure that stateless upgrade request is respected (drop state)
+        assertEquals(
+                upgradeMode == UpgradeMode.STATELESS
+                        ? UpgradeMode.STATELESS
+                        : UpgradeMode.SAVEPOINT,
+                lastReconciledSpec.getJob().getUpgradeMode());
+
+        reconciler.reconcile(deployment, context);
+        lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+        assertEquals(newImage, lastReconciledSpec.getImage());
+        assertEquals(
+                upgradeMode == UpgradeMode.STATELESS
+                        ? UpgradeMode.STATELESS
+                        : UpgradeMode.SAVEPOINT,
+                lastReconciledSpec.getJob().getUpgradeMode());
+        assertEquals(1, flinkService.listJobs().size());
+        assertEquals(
+                initSavepoint && upgradeMode != UpgradeMode.STATELESS ? "init-sp" : null,
+                flinkService.listJobs().get(0).f0);
+    }
+
+    private static Stream<Arguments> testInitialJmDeployCannotStartParams() {
+        return Stream.of(
+                Arguments.of(UpgradeMode.LAST_STATE, true),
+                Arguments.of(UpgradeMode.LAST_STATE, false),
+                Arguments.of(UpgradeMode.SAVEPOINT, true),
+                Arguments.of(UpgradeMode.SAVEPOINT, false),
+                Arguments.of(UpgradeMode.STATELESS, true),
+                Arguments.of(UpgradeMode.STATELESS, false));
+    }
+
+    private static Stream<Arguments> testUpgradeJmDeployCannotStartParams() {
+        var args = new ArrayList<Arguments>();
+        for (UpgradeMode from : UpgradeMode.values()) {
+            for (UpgradeMode to : UpgradeMode.values()) {
+                args.add(Arguments.of(from, to));
+            }
+        }
+        return args.stream();
+    }
+
+    @Test
+    public void testLastStateOnDeletedDeployment() throws Exception {
+        // Bootstrap running deployment
+        var deployment = TestUtils.buildApplicationCluster();
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+
+        reconciler.reconcile(deployment, context);
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+        // Delete cluster and keep HA metadata
+        flinkService.deleteClusterDeployment(
+                deployment.getMetadata(), deployment.getStatus(), false);
+        flinkService.setHaDataAvailable(true);
+
+        // Submit upgrade
+        deployment.getSpec().setRestartNonce(123L);
+        reconciler.reconcile(deployment, context);
+
+        var lastReconciledSpec =
+                deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+        // Make sure we correctly record upgrade mode to last state
+        assertEquals(UpgradeMode.LAST_STATE, lastReconciledSpec.getJob().getUpgradeMode());
+        assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
     }
 
     @Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index ca7d0c39..136e9311 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -30,16 +30,22 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
 import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import org.junit.jupiter.api.Test;
 
 import java.net.HttpURLConnection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -149,6 +155,43 @@ public class FlinkUtilsTest {
                         kubernetesClient));
     }
 
+    @Test
+    public void testJmNeverStartedDetection() {
+        var jmDeployment = new Deployment();
+        jmDeployment.setMetadata(new ObjectMeta());
+        jmDeployment.getMetadata().setCreationTimestamp("create-ts");
+        jmDeployment.setStatus(new DeploymentStatus());
+        var deployStatus = jmDeployment.getStatus();
+        var jmNeverStartedCondition =
+                new DeploymentCondition("create-ts", null, null, null, "False", "Available");
+        var jmStartedButStopped =
+                new DeploymentCondition("other-ts", null, null, null, "False", "Available");
+        var jmAvailable =
+                new DeploymentCondition("other-ts", null, null, null, "True", "Available");
+
+        var context =
+                new TestUtils.TestingContext<Deployment>() {
+                    @Override
+                    public <R> Optional<R> getSecondaryResource(Class<R> aClass, String name) {
+                        return (Optional<R>) Optional.of(jmDeployment);
+                    }
+                };
+
+        deployStatus.setConditions(Collections.emptyList());
+        assertFalse(FlinkUtils.jmPodNeverStarted(context));
+
+        deployStatus.setConditions(List.of(jmNeverStartedCondition));
+        assertTrue(FlinkUtils.jmPodNeverStarted(context));
+
+        deployStatus.setConditions(List.of(jmStartedButStopped));
+        assertFalse(FlinkUtils.jmPodNeverStarted(context));
+
+        deployStatus.setConditions(List.of(jmAvailable));
+        assertFalse(FlinkUtils.jmPodNeverStarted(context));
+
+        assertFalse(FlinkUtils.jmPodNeverStarted(TestUtils.createEmptyContext()));
+    }
+
     @Test
     public void testDeleteJobGraphInKubernetesHAShouldNotUpdateWithEmptyConfigMap() {
         final String name = "empty-ha-configmap";