You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/10 18:19:10 UTC

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #307: [FLINK-28187] Handle SessionJob errors on observe

gyfora commented on code in PR #307:
URL: https://github.com/apache/flink-kubernetes-operator/pull/307#discussion_r917430020


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java:
##########
@@ -115,4 +131,62 @@ public void observe(FlinkSessionJob flinkSessionJob, Context context) {
         }
         SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
     }
+
+    private void checkIfAlreadyUpgraded(
+            FlinkSessionJob flinkSessionJob, Configuration deployedConfig) {
+        var uid = flinkSessionJob.getMetadata().getUid();
+        Collection<JobStatusMessage> jobStatusMessages;
+        try {
+            jobStatusMessages = flinkService.listJobs(deployedConfig);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list jobs", e);
+        }
+        var matchedJobs = new ArrayList<JobID>();
+        for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
+            var jobId = jobStatusMessage.getJobId();
+            if (jobId.getLowerPart() == uid.hashCode()
+                    && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
+                matchedJobs.add(jobId);
+            }
+        }
+
+        if (matchedJobs.isEmpty()) {
+            return;
+        } else if (matchedJobs.size() > 1) {
+            // this indicates a bug, which means we have more than one running job for a single
+            // SessionJob CR.
+            throw new RuntimeException(
+                    String.format(
+                            "Unexpected case: %d job found for the resource with uid: %s",
+                            matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
+        } else {
+            var matchedJobID = matchedJobs.get(0);
+            Long upgradeTargetGeneration =
+                    ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
+            long deployedGeneration = matchedJobID.getUpperPart();
+
+            if (upgradeTargetGeneration == deployedGeneration) {
+                var oldJobID = flinkSessionJob.getStatus().getJobStatus().getJobId();
+                LOG.info(
+                        "Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}",
+                        oldJobID,
+                        matchedJobID.toHexString());
+                ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkSessionJob);
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setJobId(matchedJobs.get(0).toHexString());
+            } else {
+                LOG.warn(
+                        "Running job {}'s generation {} doesn't match upgrade target generation {}.",
+                        matchedJobID.toHexString(),
+                        deployedGeneration,
+                        upgradeTargetGeneration);
+            }

Review Comment:
   I think it might make sense to throw an error at this point. Otherwise we get double submission because the new job will have a new jobid due to the different generation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org