You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/09/29 20:32:22 UTC

[flink-kubernetes-operator] branch release-1.2 updated (c013c6dd -> 0fb45368)

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a change to branch release-1.2
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


    from c013c6dd [release] Update docs config for release branch
     new ed308277 [FLINK-29464] Persist generated jobid in status before deploy attempt
     new aebf4b82 [FLINK-29469] Generate empty sessionjob jar programatically
     new 0fb45368 [FLINK-29474] Fix global lifecycle metrics group to include resource type

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .dockerignore                                      |   2 -
 .gitignore                                         |   2 -
 .../metrics/lifecycle/LifecycleMetrics.java        |   8 ++-
 .../deployment/ApplicationReconciler.java          |   6 +-
 .../operator/service/AbstractFlinkService.java     |  33 ++++++-----
 .../src/main/resources/noop.jar                    | Bin 305 -> 0 bytes
 .../controller/FlinkDeploymentControllerTest.java  |  20 +++++--
 .../lifecycle/ResourceLifecycleMetricsTest.java    |  61 +++++++++++++++++++--
 .../deployment/ApplicationReconcilerTest.java      |  25 ++++++++-
 9 files changed, 119 insertions(+), 38 deletions(-)
 delete mode 100644 flink-kubernetes-operator/src/main/resources/noop.jar


[flink-kubernetes-operator] 03/03: [FLINK-29474] Fix global lifecycle metrics group to include resource type

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.2
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 0fb4536898a8d429dcedb8bfb935c10a2268cc58
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Sep 29 16:48:44 2022 +0200

    [FLINK-29474] Fix global lifecycle metrics group to include resource type
---
 .../metrics/lifecycle/LifecycleMetrics.java        |  8 ++-
 .../lifecycle/ResourceLifecycleMetricsTest.java    | 61 +++++++++++++++++++---
 2 files changed, 61 insertions(+), 8 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
index dde85075..8633a8f1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
@@ -160,7 +160,9 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
                                 name ->
                                         Tuple2.of(
                                                 createTransitionHistogram(
-                                                        name, operatorMetricGroup),
+                                                        name,
+                                                        operatorMetricGroup.addGroup(
+                                                                cr.getClass().getSimpleName())),
                                                 new ConcurrentHashMap<>())));
 
         this.stateTimeMetrics = new ConcurrentHashMap<>();
@@ -168,7 +170,9 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
             stateTimeMetrics.put(
                     state,
                     Tuple2.of(
-                            createStateTimeHistogram(state, operatorMetricGroup),
+                            createStateTimeHistogram(
+                                    state,
+                                    operatorMetricGroup.addGroup(cr.getClass().getSimpleName())),
                             new ConcurrentHashMap<>()));
         }
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
index 3240dc6c..cc41bc80 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -23,13 +23,16 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
 import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.metrics.Histogram;
 
@@ -52,6 +55,7 @@ import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLif
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for resource lifecycle metrics. */
 public class ResourceLifecycleMetricsTest {
@@ -129,7 +133,7 @@ public class ResourceLifecycleMetricsTest {
         lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
         lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
         lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
-        lifecycleTracker.onUpdate(STABLE, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(STABLE, Instant.ofEpochMilli(ts + 1000));
 
         validateTransition(transitionHistos, "Resume", 1, 4);
         validateTransition(transitionHistos, "Upgrade", 1, 5);
@@ -225,12 +229,57 @@ public class ResourceLifecycleMetricsTest {
         metricManager.onUpdate(dep3);
     }
 
-    public static LifecycleMetrics<FlinkDeployment> getLifeCycleMetrics(
-            MetricManager<FlinkDeployment> metricManager) {
-        for (CustomResourceMetrics<FlinkDeployment> metrics :
-                metricManager.getRegisteredMetrics()) {
+    @Test
+    public void testGlobalHistoNames() {
+        var conf = new Configuration();
+        var testingMetricListener = new TestingMetricListener(new Configuration());
+        var deploymentMetricManager =
+                MetricManager.createFlinkDeploymentMetricManager(
+                        new FlinkConfigManager(conf), testingMetricListener.getMetricGroup());
+        var deploymentLifecycleMetrics = getLifeCycleMetrics(deploymentMetricManager);
+        deploymentLifecycleMetrics.onUpdate(TestUtils.buildApplicationCluster());
+        testGlobalHistoNames(testingMetricListener, FlinkDeployment.class);
+
+        var sessionJobMetricManager =
+                MetricManager.createFlinkSessionJobMetricManager(
+                        new FlinkConfigManager(conf), testingMetricListener.getMetricGroup());
+        var sessionJobLifecycleMetrics = getLifeCycleMetrics(sessionJobMetricManager);
+        sessionJobLifecycleMetrics.onUpdate(TestUtils.buildSessionJob());
+
+        testGlobalHistoNames(testingMetricListener, FlinkSessionJob.class);
+    }
+
+    private void testGlobalHistoNames(TestingMetricListener metricListener, Class<?> resoureClass) {
+        for (var state : ResourceLifecycleState.values()) {
+            assertTrue(
+                    metricListener
+                            .getHistogram(
+                                    String.format(
+                                            metricListener.getMetricId(
+                                                    "%s.Lifecycle.State.%s.TimeSeconds"),
+                                            resoureClass.getSimpleName(),
+                                            state))
+                            .isPresent());
+        }
+
+        for (var transition : LifecycleMetrics.TRACKED_TRANSITIONS) {
+            assertTrue(
+                    metricListener
+                            .getHistogram(
+                                    String.format(
+                                            metricListener.getMetricId(
+                                                    "%s.Lifecycle.Transition.%s.TimeSeconds"),
+                                            resoureClass.getSimpleName(),
+                                            transition.metricName))
+                            .isPresent());
+        }
+    }
+
+    public static <T extends AbstractFlinkResource<?, ?>> LifecycleMetrics<T> getLifeCycleMetrics(
+            MetricManager<T> metricManager) {
+        for (CustomResourceMetrics<?> metrics : metricManager.getRegisteredMetrics()) {
             if (metrics instanceof LifecycleMetrics) {
-                return (LifecycleMetrics<FlinkDeployment>) metrics;
+                return (LifecycleMetrics<T>) metrics;
             }
         }
         return null;


[flink-kubernetes-operator] 02/03: [FLINK-29469] Generate empty sessionjob jar programatically

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.2
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit aebf4b823e84dc8518508c0064f881b4a2bcca44
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Sep 29 13:48:35 2022 +0200

    [FLINK-29469] Generate empty sessionjob jar programatically
---
 .dockerignore                                      |   2 --
 .gitignore                                         |   2 --
 .../operator/service/AbstractFlinkService.java     |  33 ++++++++++-----------
 .../src/main/resources/noop.jar                    | Bin 305 -> 0 bytes
 4 files changed, 16 insertions(+), 21 deletions(-)

diff --git a/.dockerignore b/.dockerignore
index 60e24e7e..5013ecf0 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -21,5 +21,3 @@
 **/.idea
 **/*.iml
 **/*.DS_Store
-
-!flink-kubernetes-operator/src/main/resources/noop.jar
diff --git a/.gitignore b/.gitignore
index 0d64c613..bb85f4bb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,5 +36,3 @@ buildNumber.properties
 .idea
 *.iml
 *.DS_Store
-
-!flink-kubernetes-operator/src/main/resources/noop.jar
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index d05b0415..0b60fabc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -95,8 +95,8 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
@@ -114,6 +114,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
@@ -126,13 +128,13 @@ import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConf
 public abstract class AbstractFlinkService implements FlinkService {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkService.class);
-    private static final String NOOP_JAR_FILENAME = "noop.jar";
+    private static final String EMPTY_JAR_FILENAME = "empty.jar";
 
     protected final KubernetesClient kubernetesClient;
     protected final FlinkConfigManager configManager;
     private final ExecutorService executorService;
     protected final ArtifactManager artifactManager;
-    private final String noopJarPath;
+    private final String emptyJar;
 
     public AbstractFlinkService(
             KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
@@ -142,7 +144,7 @@ public abstract class AbstractFlinkService implements FlinkService {
         this.executorService =
                 Executors.newFixedThreadPool(
                         4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
-        this.noopJarPath = copyNoopJar();
+        this.emptyJar = createEmptyJar();
     }
 
     protected abstract PodList getJmPodList(String namespace, String clusterId);
@@ -717,7 +719,7 @@ public abstract class AbstractFlinkService implements FlinkService {
         if (jobSpec.getJarURI() != null) {
             return jobSpec.getJarURI();
         } else {
-            return noopJarPath;
+            return emptyJar;
         }
     }
 
@@ -836,22 +838,19 @@ public abstract class AbstractFlinkService implements FlinkService {
         }
     }
 
-    private String copyNoopJar() {
+    private String createEmptyJar() {
         try {
-            InputStream noopJarSource =
-                    AbstractFlinkService.class
-                            .getClassLoader()
-                            .getResourceAsStream(NOOP_JAR_FILENAME);
+            String emptyJarPath =
+                    Files.createTempDirectory("flink").toString() + "/" + EMPTY_JAR_FILENAME;
 
-            String noopJarDestination =
-                    Files.createTempDirectory("flink").toString() + "/" + NOOP_JAR_FILENAME;
+            LOG.debug("Creating empty jar to {}", emptyJarPath);
+            JarOutputStream target =
+                    new JarOutputStream(new FileOutputStream(emptyJarPath), new Manifest());
+            target.close();
 
-            LOG.debug("Copying noop jar to {}", noopJarDestination);
-            org.apache.commons.io.FileUtils.copyToFile(noopJarSource, new File(noopJarDestination));
-
-            return noopJarDestination;
+            return emptyJarPath;
         } catch (Exception e) {
-            throw new RuntimeException("Failed to copy noop jar", e);
+            throw new RuntimeException("Failed to create empty jar", e);
         }
     }
 }
diff --git a/flink-kubernetes-operator/src/main/resources/noop.jar b/flink-kubernetes-operator/src/main/resources/noop.jar
deleted file mode 100644
index 57bf8162..00000000
Binary files a/flink-kubernetes-operator/src/main/resources/noop.jar and /dev/null differ


[flink-kubernetes-operator] 01/03: [FLINK-29464] Persist generated jobid in status before deploy attempt

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.2
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit ed30827778d86de35d8b09c6472b551c1722d2d5
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Sep 29 10:24:11 2022 +0200

    [FLINK-29464] Persist generated jobid in status before deploy attempt
---
 .../deployment/ApplicationReconciler.java          |  6 ++++--
 .../controller/FlinkDeploymentControllerTest.java  | 20 ++++++++++++-----
 .../deployment/ApplicationReconcilerTest.java      | 25 ++++++++++++++++++++--
 3 files changed, 42 insertions(+), 9 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index c0f53519..37236488 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -176,7 +176,7 @@ public class ApplicationReconciler
             flinkService.waitForClusterShutdown(deployConfig);
         }
 
-        setJobIdIfNecessary(spec, status, deployConfig);
+        setJobIdIfNecessary(spec, relatedResource, deployConfig);
 
         eventRecorder.triggerEvent(
                 relatedResource,
@@ -193,7 +193,7 @@ public class ApplicationReconciler
     }
 
     private void setJobIdIfNecessary(
-            FlinkDeploymentSpec spec, FlinkDeploymentStatus status, Configuration deployConfig) {
+            FlinkDeploymentSpec spec, FlinkDeployment resource, Configuration deployConfig) {
         // https://issues.apache.org/jira/browse/FLINK-19358
         // https://issues.apache.org/jira/browse/FLINK-29109
         if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_15)) {
@@ -205,6 +205,7 @@ public class ApplicationReconciler
             return;
         }
 
+        var status = resource.getStatus();
         // generate jobId initially or rotate on every deployment when mode is stateless
         if (status.getJobStatus().getJobId() == null
                 || spec.getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
@@ -212,6 +213,7 @@ public class ApplicationReconciler
             // record before first deployment to ensure we use it on any retry
             status.getJobStatus().setJobId(jobId);
             LOG.info("Assigning JobId override to {}", jobId);
+            statusRecorder.patchAndCacheStatus(resource);
         }
 
         String jobId = status.getJobStatus().getJobId();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 36003756..9cc05231 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -116,7 +116,9 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(3, testController.getInternalStatusUpdateCount());
+        assertEquals(
+                flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 3 : 4,
+                testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -139,7 +141,9 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(4, testController.getInternalStatusUpdateCount());
+        assertEquals(
+                flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 4 : 5,
+                testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -153,7 +157,9 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(5, testController.getInternalStatusUpdateCount());
+        assertEquals(
+                flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 5 : 6,
+                testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -168,7 +174,9 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(5, testController.getInternalStatusUpdateCount());
+        assertEquals(
+                flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 5 : 6,
+                testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -194,7 +202,9 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(6, testController.getInternalStatusUpdateCount());
+        assertEquals(
+                flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 6 : 7,
+                testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
 
         reconciliationStatus = appCluster.getStatus().getReconciliationStatus();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 0d4b30cc..3d35d84a 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
 
@@ -58,6 +59,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.platform.commons.util.StringUtils;
 
 import java.util.List;
 import java.util.Optional;
@@ -79,12 +81,13 @@ public class ApplicationReconcilerTest {
     private TestingFlinkService flinkService;
     private ApplicationReconciler reconciler;
     private Context<FlinkDeployment> context;
+    private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder;
 
     @BeforeEach
     public void before() {
         kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
         var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
-        var statusRecoder = new TestingStatusRecorder<FlinkDeployment, FlinkDeploymentStatus>();
+        statusRecorder = new TestingStatusRecorder<FlinkDeployment, FlinkDeploymentStatus>();
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         reconciler =
@@ -93,7 +96,7 @@ public class ApplicationReconcilerTest {
                         flinkService,
                         configManager,
                         eventRecorder,
-                        statusRecoder);
+                        statusRecorder);
     }
 
     @ParameterizedTest
@@ -549,4 +552,22 @@ public class ApplicationReconcilerTest {
                         .getState());
         assertEquals(4, flinkService.getDesiredReplicas());
     }
+
+    @ParameterizedTest
+    @EnumSource(FlinkVersion.class)
+    public void verifyJobIdNotResetDuringLastStateRecovery(FlinkVersion flinkVersion) {
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion);
+
+        flinkService.setDeployFailure(true);
+
+        try {
+            reconciler.reconcile(deployment, context);
+        } catch (Exception expected) {
+        }
+        statusRecorder.updateStatusFromCache(deployment);
+
+        if (!flinkVersion.isNewerVersionThan(FlinkVersion.v1_15)) {
+            assertFalse(StringUtils.isBlank(deployment.getStatus().getJobStatus().getJobId()));
+        }
+    }
 }