You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/10 23:06:31 UTC

[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #307: [FLINK-28187] Handle SessionJob errors on observe

Aitozi commented on code in PR #307:
URL: https://github.com/apache/flink-kubernetes-operator/pull/307#discussion_r917458759


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java:
##########
@@ -115,4 +131,62 @@ public void observe(FlinkSessionJob flinkSessionJob, Context context) {
         }
         SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
     }
+
+    private void checkIfAlreadyUpgraded(
+            FlinkSessionJob flinkSessionJob, Configuration deployedConfig) {
+        var uid = flinkSessionJob.getMetadata().getUid();
+        Collection<JobStatusMessage> jobStatusMessages;
+        try {
+            jobStatusMessages = flinkService.listJobs(deployedConfig);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list jobs", e);
+        }
+        var matchedJobs = new ArrayList<JobID>();
+        for (JobStatusMessage jobStatusMessage : jobStatusMessages) {
+            var jobId = jobStatusMessage.getJobId();
+            if (jobId.getLowerPart() == uid.hashCode()
+                    && !jobStatusMessage.getJobState().isGloballyTerminalState()) {
+                matchedJobs.add(jobId);
+            }
+        }
+
+        if (matchedJobs.isEmpty()) {
+            return;
+        } else if (matchedJobs.size() > 1) {
+            // this indicates a bug, which means we have more than one running job for a single
+            // SessionJob CR.
+            throw new RuntimeException(
+                    String.format(
+                            "Unexpected case: %d job found for the resource with uid: %s",
+                            matchedJobs.size(), flinkSessionJob.getMetadata().getUid()));
+        } else {
+            var matchedJobID = matchedJobs.get(0);
+            Long upgradeTargetGeneration =
+                    ReconciliationUtils.getUpgradeTargetGeneration(flinkSessionJob);
+            long deployedGeneration = matchedJobID.getUpperPart();
+
+            if (upgradeTargetGeneration == deployedGeneration) {
+                var oldJobID = flinkSessionJob.getStatus().getJobStatus().getJobId();
+                LOG.info(
+                        "Pending upgrade is already deployed, updating status. Old jobID:{}, new jobID:{}",
+                        oldJobID,
+                        matchedJobID.toHexString());
+                ReconciliationUtils.updateStatusForAlreadyUpgraded(flinkSessionJob);
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                flinkSessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setJobId(matchedJobs.get(0).toHexString());
+            } else {
+                LOG.warn(
+                        "Running job {}'s generation {} doesn't match upgrade target generation {}.",
+                        matchedJobID.toHexString(),
+                        deployedGeneration,
+                        upgradeTargetGeneration);
+            }

Review Comment:
   I think we have a chance to get here:
   
   1. The upgrade fails between the `UPGRADING` status recorded and deploy
   2. The deployedGeneration will smaller than the targetGeneration
   
   In this case, we have to wait for the next turn to deal with reconcileSpecChange. It will cancel the oldJobID in jobStatus and submit with new JobID.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org