You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/11 07:29:30 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28187] Handle SessionJob upgrade errors on observe

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 16c9f45  [FLINK-28187] Handle SessionJob upgrade errors on observe
16c9f45 is described below

commit 16c9f45061d0b6c2ca31f4f0ed98378e70a9f33b
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Mon Jul 11 15:29:25 2022 +0800

    [FLINK-28187] Handle SessionJob upgrade errors on observe
---
 .../observer/sessionjob/SessionJobObserver.java    |  75 +++++++++-
 .../AbstractFlinkResourceReconciler.java           |   1 +
 .../kubernetes/operator/service/FlinkService.java  |  14 +-
 .../kubernetes/operator/utils/FlinkUtils.java      |  24 ++++
 .../flink/kubernetes/operator/TestUtils.java       |   3 +
 .../kubernetes/operator/TestingFlinkService.java   |  10 +-
 .../sessionjob/SessionJobObserverTest.java         | 152 ++++++++++++++++++++-
 7 files changed, 270 insertions(+), 9 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index a91edef..5ba397c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -17,15 +17,19 @@
 
 package org.apache.flink.kubernetes.operator.observer.sessionjob;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
 import org.apache.flink.kubernetes.operator.observer.context.VoidObserverContext;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -38,6 +42,8 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -50,6 +56,7 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
     private final EventRecorder eventRecorder;
     private final SavepointObserver<FlinkSessionJobStatus> savepointObserver;
     private final JobStatusObserver<VoidObserverContext> jobStatusObserver;
+    private final FlinkService flinkService;
 
     public SessionJobObserver(
             FlinkService flinkService,
@@ -58,8 +65,9 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
             EventRecorder eventRecorder) {
         this.configManager = configManager;
         this.eventRecorder = eventRecorder;
+        this.flinkService = flinkService;
         this.savepointObserver =
-                new SavepointObserver(flinkService, configManager, statusRecorder, eventRecorder);
+                new SavepointObserver<>(flinkService, configManager, statusRecorder, eventRecorder);
         this.jobStatusObserver =
                 new JobStatusObserver<>(flinkService, eventRecorder) {
                     @Override
@@ -106,6 +114,14 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
 
         var deployedConfig =
                 configManager.getSessionJobConfig(flinkDepOpt.get(), flinkSessionJob.getSpec());
+        var reconciliationStatus = flinkSessionJob.getStatus().getReconciliationStatus();
+        if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+            checkIfAlreadyUpgraded(flinkSessionJob, deployedConfig);
+            if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+                return;
+            }
+        }
+
         var jobFound =
                 jobStatusObserver.observe(
                         flinkSessionJob, deployedConfig, VoidObserverContext.INSTANCE);
@@ -115,4 +131,61 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
         }
         SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
     }
+
+    private void checkIfAlreadyUpgraded(
+            FlinkSessionJob flinkSessionJob, Configuration deployedConfig) {
+        var uid = flinkSessionJob.getMetadata().getUid();
+        Collection<JobStatusMessage> jobStatusMessages;
+        try {
+            jobStatusMessages = flinkService.listJobs(deployedConfig);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list jobs", e);
+        }
+        var matchedJobs = new ArrayList<JobID>();
+        for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
+            var jobId = jobStatusMessage.getJobId();
+            if (jobId.getLowerPart() == uid.hashCode()
+                    && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
+                matchedJobs.add(jobId);
+            }
+        }
+
+        if (matchedJobs.isEmpty()) {
+            return;
+        } else if (matchedJobs.size() > 1) {
+            // this indicates a bug, which means we have more than one running job for a single
+            // SessionJob CR.
+            throw new RuntimeException(
+                    String.format(
+                            "Unexpected case: %d job found for the resource with uid: %s",
+                            matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
+        } else {
+            var matchedJobID = matchedJobs.get(0);
+            Long upgradeTargetGeneration =
+                    ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
+            long deployedGeneration = matchedJobID.getUpperPart();
+            var oldJobID = flinkSessionJob.getStatus().getJobStatus().getJobId();
+
+            if (upgradeTargetGeneration == deployedGeneration) {
+                LOG.info(
+                        "Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}",
+                        oldJobID,
+                        matchedJobID.toHexString());
+                ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkSessionJob);
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                flinkSessionJob.getStatus().getJobStatus().setJobId(matchedJobID.toHexString());
+            } else {
+                var msg =
+                        String.format(
+                                "Running job %s's generation %s doesn't match upgrade target generation %s.",
+                                matchedJobID.toHexString(),
+                                deployedGeneration,
+                                upgradeTargetGeneration);
+                throw new RuntimeException(msg);
+            }
+        }
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 2d26e82..903b4e1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -115,6 +115,7 @@ public abstract class AbstractFlinkResourceReconciler<
                     deployConfig,
                     Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
                     false);
+
             ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);
             return;
         }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index aca8cff..2deccae 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -235,19 +235,21 @@ public class FlinkService {
             Configuration conf,
             @Nullable String savepoint)
             throws Exception {
-        var jarRunResponseBody =
-                runJar(spec.getJob(), uploadJar(meta, spec, conf), conf, savepoint);
-        var jobID = jarRunResponseBody.getJobId();
+        // we generate jobID in advance to help deduplicate job submission.
+        var jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
+        runJar(spec.getJob(), jobID, uploadJar(meta, spec, conf), conf, savepoint);
         LOG.info("Submitted job: {} to session cluster.", jobID);
         return jobID;
     }
 
     private JarRunResponseBody runJar(
-            JobSpec job, JarUploadResponseBody response, Configuration conf, String savepoint) {
+            JobSpec job,
+            JobID jobID,
+            JarUploadResponseBody response,
+            Configuration conf,
+            String savepoint) {
         String jarId =
                 response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
-        // we generate jobID in advance to help deduplicate job submission.
-        JobID jobID = new JobID();
         try (RestClusterClient<String> clusterClient =
                 (RestClusterClient<String>) getClusterClient(conf)) {
             JarRunHeaders headers = JarRunHeaders.getInstance();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 2a3df03..c1de80e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -31,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -315,4 +317,26 @@ public class FlinkUtils {
         labels.put(CR_GENERATION_LABEL, generation.toString());
         conf.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, labels);
     }
+
+    /**
+     * The jobID's lower part is the resource uid, the higher part is the resource generation.
+     *
+     * @param meta the meta of the resource.
+     * @return the generated jobID.
+     */
+    public static JobID generateSessionJobFixedJobID(ObjectMeta meta) {
+        return generateSessionJobFixedJobID(meta.getUid(), meta.getGeneration());
+    }
+
+    /**
+     * The jobID's lower part is the resource uid, the higher part is the resource generation.
+     *
+     * @param uid the uid of the resource.
+     * @param generation the generation of the resource.
+     * @return the generated jobID.
+     */
+    public static JobID generateSessionJobFixedJobID(String uid, Long generation) {
+        return new JobID(
+                Preconditions.checkNotNull(uid).hashCode(), Preconditions.checkNotNull(generation));
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 038ffd7..bdefa4b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -82,6 +82,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
@@ -146,6 +147,8 @@ public class TestUtils {
                         .withName(TEST_SESSION_JOB_NAME)
                         .withNamespace(TEST_NAMESPACE)
                         .withCreationTimestamp(Instant.now().toString())
+                        .withUid(UUID.randomUUID().toString())
+                        .withGeneration(1L)
                         .build());
         sessionJob.setSpec(
                 FlinkSessionJobSpec.builder()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 94ce385..439b3ec 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -93,6 +93,7 @@ public class TestingFlinkService extends FlinkService {
     private boolean isPortReady = true;
     private boolean haDataAvailable = true;
     private boolean deployFailure = false;
+    private Runnable sessionJobSubmittedCallback;
     private PodList podList = new PodList();
     private Consumer<Configuration> listJobConsumer = conf -> {};
     private List<String> disposedSavepoints = new ArrayList<>();
@@ -180,6 +181,10 @@ public class TestingFlinkService extends FlinkService {
         this.deployFailure = deployFailure;
     }
 
+    public void setSessionJobSubmittedCallback(Runnable sessionJobSubmittedCallback) {
+        this.sessionJobSubmittedCallback = sessionJobSubmittedCallback;
+    }
+
     @Override
     public void submitSessionCluster(Configuration conf) {
         sessions.add(conf.get(KubernetesConfigOptions.CLUSTER_ID));
@@ -196,7 +201,7 @@ public class TestingFlinkService extends FlinkService {
         if (deployFailure) {
             throw new Exception("Deployment failure");
         }
-        JobID jobID = new JobID();
+        JobID jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
         JobStatusMessage jobStatusMessage =
                 new JobStatusMessage(
                         jobID,
@@ -205,6 +210,9 @@ public class TestingFlinkService extends FlinkService {
                         System.currentTimeMillis());
 
         jobs.add(Tuple2.of(savepoint, jobStatusMessage));
+        if (sessionJobSubmittedCallback != null) {
+            sessionJobSubmittedCallback.run();
+        }
         return jobID;
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 1fc1be8..e7270bd 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.observer.sessionjob;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
@@ -26,10 +27,13 @@ import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -45,7 +49,7 @@ public class SessionJobObserverTest {
 
     private KubernetesClient kubernetesClient;
     private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
-    private final TestingFlinkService flinkService = new TestingFlinkService();
+    private TestingFlinkService flinkService;
     private SessionJobObserver observer;
     private SessionJobReconciler reconciler;
 
@@ -54,6 +58,7 @@ public class SessionJobObserverTest {
         kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
         var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
         var statusRecorder = new TestingStatusRecorder<FlinkSessionJobStatus>();
+        flinkService = new TestingFlinkService();
         observer =
                 new SessionJobObserver(flinkService, configManager, statusRecorder, eventRecorder);
         reconciler =
@@ -191,4 +196,149 @@ public class SessionJobObserverTest {
         Assertions.assertFalse(
                 SavepointUtils.savepointInProgress(sessionJob.getStatus().getJobStatus()));
     }
+
+    @Test
+    public void testObserveAlreadySubmitted() {
+        final var sessionJob = TestUtils.buildSessionJob();
+        sessionJob.getMetadata().setGeneration(10L);
+        final var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
+
+        flinkService.setSessionJobSubmittedCallback(
+                () -> {
+                    throw new RuntimeException("Failed after submitted job");
+                });
+        // submit job
+        Assertions.assertThrows(
+                RuntimeException.class,
+                () -> {
+                    reconciler.reconcile(sessionJob, readyContext);
+                });
+        Assertions.assertNotNull(sessionJob.getStatus().getReconciliationStatus());
+        Assertions.assertEquals(
+                ReconciliationState.UPGRADING,
+                sessionJob.getStatus().getReconciliationStatus().getState());
+        Assertions.assertNull(sessionJob.getStatus().getJobStatus().getJobId());
+
+        observer.observe(sessionJob, readyContext);
+        Assertions.assertEquals(
+                ReconciliationState.DEPLOYED,
+                sessionJob.getStatus().getReconciliationStatus().getState());
+        var jobID = sessionJob.getStatus().getJobStatus().getJobId();
+        Assertions.assertNotNull(jobID);
+        Assertions.assertEquals(10, JobID.fromHexString(jobID).getUpperPart());
+        Assertions.assertEquals(
+                JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+    }
+
+    @Test
+    public void testObserveAlreadyUpgraded() throws Exception {
+        final var sessionJob = TestUtils.buildSessionJob();
+        sessionJob.getMetadata().setGeneration(10L);
+        final var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
+
+        reconciler.reconcile(sessionJob, readyContext);
+        observer.observe(sessionJob, readyContext);
+        Assertions.assertEquals(
+                ReconciliationState.DEPLOYED,
+                sessionJob.getStatus().getReconciliationStatus().getState());
+        var jobID = sessionJob.getStatus().getJobStatus().getJobId();
+        Assertions.assertNotNull(jobID);
+        Assertions.assertEquals(10, JobID.fromHexString(jobID).getUpperPart());
+        Assertions.assertEquals(
+                JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+
+        flinkService.setSessionJobSubmittedCallback(
+                () -> {
+                    throw new RuntimeException("Failed after submitted job");
+                });
+        sessionJob.getSpec().getJob().setParallelism(10);
+        sessionJob.getMetadata().setGeneration(11L);
+
+        // upgrade
+        Assertions.assertThrows(
+                RuntimeException.class,
+                () -> {
+                    // suspend
+                    reconciler.reconcile(sessionJob, readyContext);
+                    // upgrade
+                    reconciler.reconcile(sessionJob, readyContext);
+                });
+
+        Assertions.assertEquals(
+                ReconciliationState.UPGRADING,
+                sessionJob.getStatus().getReconciliationStatus().getState());
+        // jobID not changed
+        Assertions.assertEquals(jobID, sessionJob.getStatus().getJobStatus().getJobId());
+
+        observer.observe(sessionJob, readyContext);
+
+        Assertions.assertEquals(
+                ReconciliationState.DEPLOYED,
+                sessionJob.getStatus().getReconciliationStatus().getState());
+        Assertions.assertEquals(
+                11L,
+                JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId())
+                        .getUpperPart());
+    }
+
+    @Test
+    public void testOrphanedJob() throws Exception {
+        final var sessionJob = TestUtils.buildSessionJob();
+        sessionJob.getMetadata().setGeneration(10L);
+        final var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
+
+        reconciler.reconcile(sessionJob, readyContext);
+        observer.observe(sessionJob, readyContext);
+        Assertions.assertEquals(
+                ReconciliationState.DEPLOYED,
+                sessionJob.getStatus().getReconciliationStatus().getState());
+        var jobID = sessionJob.getStatus().getJobStatus().getJobId();
+        Assertions.assertNotNull(jobID);
+        Assertions.assertEquals(10, JobID.fromHexString(jobID).getUpperPart());
+        Assertions.assertEquals(
+                JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+
+        flinkService.setSessionJobSubmittedCallback(
+                () -> {
+                    throw new RuntimeException("Failed after submitted job");
+                });
+        sessionJob.getSpec().getJob().setParallelism(10);
+        sessionJob.getMetadata().setGeneration(11L);
+        // upgrade
+        Assertions.assertThrows(
+                RuntimeException.class,
+                () -> {
+                    // suspend
+                    reconciler.reconcile(sessionJob, readyContext);
+                    // upgrade
+                    reconciler.reconcile(sessionJob, readyContext);
+                });
+
+        Assertions.assertEquals(
+                ReconciliationState.UPGRADING,
+                sessionJob.getStatus().getReconciliationStatus().getState());
+        // jobID not changed
+        Assertions.assertEquals(jobID, sessionJob.getStatus().getJobStatus().getJobId());
+
+        // mock a job with different id of the target CR occurs
+        var jobs = flinkService.listJobs();
+        for (Tuple2<String, JobStatusMessage> job : jobs) {
+            if (!job.f1.getJobState().isGloballyTerminalState()
+                    && !job.f1.getJobId().toHexString().equals(jobID)) {
+                job.f1 =
+                        new JobStatusMessage(
+                                FlinkUtils.generateSessionJobFixedJobID(
+                                        sessionJob.getMetadata().getUid(), -1L),
+                                job.f1.getJobName(),
+                                job.f1.getJobState(),
+                                job.f1.getStartTime());
+            }
+        }
+
+        var exception =
+                Assertions.assertThrows(
+                        RuntimeException.class, () -> observer.observe(sessionJob, readyContext));
+        Assertions.assertTrue(
+                exception.getMessage().contains("doesn't match upgrade target generation"));
+    }
 }