You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/11 07:29:30 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28187] Handle SessionJob upgrade errors on observe
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 16c9f45 [FLINK-28187] Handle SessionJob upgrade errors on observe
16c9f45 is described below
commit 16c9f45061d0b6c2ca31f4f0ed98378e70a9f33b
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Mon Jul 11 15:29:25 2022 +0800
[FLINK-28187] Handle SessionJob upgrade errors on observe
---
.../observer/sessionjob/SessionJobObserver.java | 75 +++++++++-
.../AbstractFlinkResourceReconciler.java | 1 +
.../kubernetes/operator/service/FlinkService.java | 14 +-
.../kubernetes/operator/utils/FlinkUtils.java | 24 ++++
.../flink/kubernetes/operator/TestUtils.java | 3 +
.../kubernetes/operator/TestingFlinkService.java | 10 +-
.../sessionjob/SessionJobObserverTest.java | 152 ++++++++++++++++++++-
7 files changed, 270 insertions(+), 9 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index a91edef..5ba397c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -17,15 +17,19 @@
package org.apache.flink.kubernetes.operator.observer.sessionjob;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
import org.apache.flink.kubernetes.operator.observer.context.VoidObserverContext;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -38,6 +42,8 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -50,6 +56,7 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
private final EventRecorder eventRecorder;
private final SavepointObserver<FlinkSessionJobStatus> savepointObserver;
private final JobStatusObserver<VoidObserverContext> jobStatusObserver;
+ private final FlinkService flinkService;
public SessionJobObserver(
FlinkService flinkService,
@@ -58,8 +65,9 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
EventRecorder eventRecorder) {
this.configManager = configManager;
this.eventRecorder = eventRecorder;
+ this.flinkService = flinkService;
this.savepointObserver =
- new SavepointObserver(flinkService, configManager, statusRecorder, eventRecorder);
+ new SavepointObserver<>(flinkService, configManager, statusRecorder, eventRecorder);
this.jobStatusObserver =
new JobStatusObserver<>(flinkService, eventRecorder) {
@Override
@@ -106,6 +114,14 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
var deployedConfig =
configManager.getSessionJobConfig(flinkDepOpt.get(), flinkSessionJob.getSpec());
+ var reconciliationStatus = flinkSessionJob.getStatus().getReconciliationStatus();
+ if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+ checkIfAlreadyUpgraded(flinkSessionJob, deployedConfig);
+ if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+ return;
+ }
+ }
+
var jobFound =
jobStatusObserver.observe(
flinkSessionJob, deployedConfig, VoidObserverContext.INSTANCE);
@@ -115,4 +131,61 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
}
SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
}
+
+ private void checkIfAlreadyUpgraded(
+ FlinkSessionJob flinkSessionJob, Configuration deployedConfig) {
+ var uid = flinkSessionJob.getMetadata().getUid();
+ Collection<JobStatusMessage> jobStatusMessages;
+ try {
+ jobStatusMessages = flinkService.listJobs(deployedConfig);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to list jobs", e);
+ }
+ var matchedJobs = new ArrayList<JobID>();
+ for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
+ var jobId = jobStatusMessage.getJobId();
+ if (jobId.getLowerPart() == uid.hashCode()
+ && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
+ matchedJobs.add(jobId);
+ }
+ }
+
+ if (matchedJobs.isEmpty()) {
+ return;
+ } else if (matchedJobs.size() > 1) {
+ // this indicates a bug, which means we have more than one running job for a single
+ // SessionJob CR.
+ throw new RuntimeException(
+ String.format(
+ "Unexpected case: %d job found for the resource with uid: %s",
+ matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
+ } else {
+ var matchedJobID = matchedJobs.get(0);
+ Long upgradeTargetGeneration =
+ ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
+ long deployedGeneration = matchedJobID.getUpperPart();
+ var oldJobID = flinkSessionJob.getStatus().getJobStatus().getJobId();
+
+ if (upgradeTargetGeneration == deployedGeneration) {
+ LOG.info(
+ "Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}",
+ oldJobID,
+ matchedJobID.toHexString());
+ ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkSessionJob);
+ flinkSessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ flinkSessionJob.getStatus().getJobStatus().setJobId(matchedJobID.toHexString());
+ } else {
+ var msg =
+ String.format(
+ "Running job %s's generation %s doesn't match upgrade target generation %s.",
+ matchedJobID.toHexString(),
+ deployedGeneration,
+ upgradeTargetGeneration);
+ throw new RuntimeException(msg);
+ }
+ }
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 2d26e82..903b4e1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -115,6 +115,7 @@ public abstract class AbstractFlinkResourceReconciler<
deployConfig,
Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
false);
+
ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);
return;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index aca8cff..2deccae 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -235,19 +235,21 @@ public class FlinkService {
Configuration conf,
@Nullable String savepoint)
throws Exception {
- var jarRunResponseBody =
- runJar(spec.getJob(), uploadJar(meta, spec, conf), conf, savepoint);
- var jobID = jarRunResponseBody.getJobId();
+ // we generate jobID in advance to help deduplicate job submission.
+ var jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
+ runJar(spec.getJob(), jobID, uploadJar(meta, spec, conf), conf, savepoint);
LOG.info("Submitted job: {} to session cluster.", jobID);
return jobID;
}
private JarRunResponseBody runJar(
- JobSpec job, JarUploadResponseBody response, Configuration conf, String savepoint) {
+ JobSpec job,
+ JobID jobID,
+ JarUploadResponseBody response,
+ Configuration conf,
+ String savepoint) {
String jarId =
response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
- // we generate jobID in advance to help deduplicate job submission.
- JobID jobID = new JobID();
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
JarRunHeaders headers = JarRunHeaders.getInstance();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 2a3df03..c1de80e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.utils;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
@@ -31,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -315,4 +317,26 @@ public class FlinkUtils {
labels.put(CR_GENERATION_LABEL, generation.toString());
conf.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, labels);
}
+
+ /**
+ * The jobID's lower part is the resource uid, the higher part is the resource generation.
+ *
+ * @param meta the meta of the resource.
+ * @return the generated jobID.
+ */
+ public static JobID generateSessionJobFixedJobID(ObjectMeta meta) {
+ return generateSessionJobFixedJobID(meta.getUid(), meta.getGeneration());
+ }
+
+ /**
+ * The jobID's lower part is the resource uid, the higher part is the resource generation.
+ *
+ * @param uid the uid of the resource.
+ * @param generation the generation of the resource.
+ * @return the generated jobID.
+ */
+ public static JobID generateSessionJobFixedJobID(String uid, Long generation) {
+ return new JobID(
+ Preconditions.checkNotNull(uid).hashCode(), Preconditions.checkNotNull(generation));
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 038ffd7..bdefa4b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -82,6 +82,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@@ -146,6 +147,8 @@ public class TestUtils {
.withName(TEST_SESSION_JOB_NAME)
.withNamespace(TEST_NAMESPACE)
.withCreationTimestamp(Instant.now().toString())
+ .withUid(UUID.randomUUID().toString())
+ .withGeneration(1L)
.build());
sessionJob.setSpec(
FlinkSessionJobSpec.builder()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 94ce385..439b3ec 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -93,6 +93,7 @@ public class TestingFlinkService extends FlinkService {
private boolean isPortReady = true;
private boolean haDataAvailable = true;
private boolean deployFailure = false;
+ private Runnable sessionJobSubmittedCallback;
private PodList podList = new PodList();
private Consumer<Configuration> listJobConsumer = conf -> {};
private List<String> disposedSavepoints = new ArrayList<>();
@@ -180,6 +181,10 @@ public class TestingFlinkService extends FlinkService {
this.deployFailure = deployFailure;
}
+ public void setSessionJobSubmittedCallback(Runnable sessionJobSubmittedCallback) {
+ this.sessionJobSubmittedCallback = sessionJobSubmittedCallback;
+ }
+
@Override
public void submitSessionCluster(Configuration conf) {
sessions.add(conf.get(KubernetesConfigOptions.CLUSTER_ID));
@@ -196,7 +201,7 @@ public class TestingFlinkService extends FlinkService {
if (deployFailure) {
throw new Exception("Deployment failure");
}
- JobID jobID = new JobID();
+ JobID jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
JobStatusMessage jobStatusMessage =
new JobStatusMessage(
jobID,
@@ -205,6 +210,9 @@ public class TestingFlinkService extends FlinkService {
System.currentTimeMillis());
jobs.add(Tuple2.of(savepoint, jobStatusMessage));
+ if (sessionJobSubmittedCallback != null) {
+ sessionJobSubmittedCallback.run();
+ }
return jobID;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 1fc1be8..e7270bd 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.observer.sessionjob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
@@ -26,10 +27,13 @@ import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -45,7 +49,7 @@ public class SessionJobObserverTest {
private KubernetesClient kubernetesClient;
private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
- private final TestingFlinkService flinkService = new TestingFlinkService();
+ private TestingFlinkService flinkService;
private SessionJobObserver observer;
private SessionJobReconciler reconciler;
@@ -54,6 +58,7 @@ public class SessionJobObserverTest {
kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
var statusRecorder = new TestingStatusRecorder<FlinkSessionJobStatus>();
+ flinkService = new TestingFlinkService();
observer =
new SessionJobObserver(flinkService, configManager, statusRecorder, eventRecorder);
reconciler =
@@ -191,4 +196,149 @@ public class SessionJobObserverTest {
Assertions.assertFalse(
SavepointUtils.savepointInProgress(sessionJob.getStatus().getJobStatus()));
}
+
+ @Test
+ public void testObserveAlreadySubmitted() {
+ final var sessionJob = TestUtils.buildSessionJob();
+ sessionJob.getMetadata().setGeneration(10L);
+ final var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
+
+ flinkService.setSessionJobSubmittedCallback(
+ () -> {
+ throw new RuntimeException("Failed after submitted job");
+ });
+ // submit job
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () -> {
+ reconciler.reconcile(sessionJob, readyContext);
+ });
+ Assertions.assertNotNull(sessionJob.getStatus().getReconciliationStatus());
+ Assertions.assertEquals(
+ ReconciliationState.UPGRADING,
+ sessionJob.getStatus().getReconciliationStatus().getState());
+ Assertions.assertNull(sessionJob.getStatus().getJobStatus().getJobId());
+
+ observer.observe(sessionJob, readyContext);
+ Assertions.assertEquals(
+ ReconciliationState.DEPLOYED,
+ sessionJob.getStatus().getReconciliationStatus().getState());
+ var jobID = sessionJob.getStatus().getJobStatus().getJobId();
+ Assertions.assertNotNull(jobID);
+ Assertions.assertEquals(10, JobID.fromHexString(jobID).getUpperPart());
+ Assertions.assertEquals(
+ JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ }
+
+ @Test
+ public void testObserveAlreadyUpgraded() throws Exception {
+ final var sessionJob = TestUtils.buildSessionJob();
+ sessionJob.getMetadata().setGeneration(10L);
+ final var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
+
+ reconciler.reconcile(sessionJob, readyContext);
+ observer.observe(sessionJob, readyContext);
+ Assertions.assertEquals(
+ ReconciliationState.DEPLOYED,
+ sessionJob.getStatus().getReconciliationStatus().getState());
+ var jobID = sessionJob.getStatus().getJobStatus().getJobId();
+ Assertions.assertNotNull(jobID);
+ Assertions.assertEquals(10, JobID.fromHexString(jobID).getUpperPart());
+ Assertions.assertEquals(
+ JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+
+ flinkService.setSessionJobSubmittedCallback(
+ () -> {
+ throw new RuntimeException("Failed after submitted job");
+ });
+ sessionJob.getSpec().getJob().setParallelism(10);
+ sessionJob.getMetadata().setGeneration(11L);
+
+ // upgrade
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () -> {
+ // suspend
+ reconciler.reconcile(sessionJob, readyContext);
+ // upgrade
+ reconciler.reconcile(sessionJob, readyContext);
+ });
+
+ Assertions.assertEquals(
+ ReconciliationState.UPGRADING,
+ sessionJob.getStatus().getReconciliationStatus().getState());
+ // jobID not changed
+ Assertions.assertEquals(jobID, sessionJob.getStatus().getJobStatus().getJobId());
+
+ observer.observe(sessionJob, readyContext);
+
+ Assertions.assertEquals(
+ ReconciliationState.DEPLOYED,
+ sessionJob.getStatus().getReconciliationStatus().getState());
+ Assertions.assertEquals(
+ 11L,
+ JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId())
+ .getUpperPart());
+ }
+
+ @Test
+ public void testOrphanedJob() throws Exception {
+ final var sessionJob = TestUtils.buildSessionJob();
+ sessionJob.getMetadata().setGeneration(10L);
+ final var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
+
+ reconciler.reconcile(sessionJob, readyContext);
+ observer.observe(sessionJob, readyContext);
+ Assertions.assertEquals(
+ ReconciliationState.DEPLOYED,
+ sessionJob.getStatus().getReconciliationStatus().getState());
+ var jobID = sessionJob.getStatus().getJobStatus().getJobId();
+ Assertions.assertNotNull(jobID);
+ Assertions.assertEquals(10, JobID.fromHexString(jobID).getUpperPart());
+ Assertions.assertEquals(
+ JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+
+ flinkService.setSessionJobSubmittedCallback(
+ () -> {
+ throw new RuntimeException("Failed after submitted job");
+ });
+ sessionJob.getSpec().getJob().setParallelism(10);
+ sessionJob.getMetadata().setGeneration(11L);
+ // upgrade
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () -> {
+ // suspend
+ reconciler.reconcile(sessionJob, readyContext);
+ // upgrade
+ reconciler.reconcile(sessionJob, readyContext);
+ });
+
+ Assertions.assertEquals(
+ ReconciliationState.UPGRADING,
+ sessionJob.getStatus().getReconciliationStatus().getState());
+ // jobID not changed
+ Assertions.assertEquals(jobID, sessionJob.getStatus().getJobStatus().getJobId());
+
+ // mock a job with different id of the target CR occurs
+ var jobs = flinkService.listJobs();
+ for (Tuple2<String, JobStatusMessage> job : jobs) {
+ if (!job.f1.getJobState().isGloballyTerminalState()
+ && !job.f1.getJobId().toHexString().equals(jobID)) {
+ job.f1 =
+ new JobStatusMessage(
+ FlinkUtils.generateSessionJobFixedJobID(
+ sessionJob.getMetadata().getUid(), -1L),
+ job.f1.getJobName(),
+ job.f1.getJobState(),
+ job.f1.getStartTime());
+ }
+ }
+
+ var exception =
+ Assertions.assertThrows(
+ RuntimeException.class, () -> observer.observe(sessionJob, readyContext));
+ Assertions.assertTrue(
+ exception.getMessage().contains("doesn't match upgrade target generation"));
+ }
}