You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/09/12 12:16:58 UTC

[flink] branch release-1.16 updated: [FLINK-29223][coordination] Add missing output info for jobs already reached terminal state

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new 3035a8e9747 [FLINK-29223][coordination] Add missing output info for jobs already reached terminal state
3035a8e9747 is described below

commit 3035a8e9747c89b1920746d2b87e34e89ef89066
Author: snuyanzin <sn...@gmail.com>
AuthorDate: Thu Sep 8 09:24:39 2022 +0200

    [FLINK-29223][coordination] Add missing output info for jobs already reached terminal state
---
 .../runner/JobDispatcherLeaderProcessFactoryFactory.java       | 10 +++++++---
 .../dispatcher/runner/SessionDispatcherLeaderProcess.java      |  4 ++++
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
index 2ebc3bce8fc..8d38825bc58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
@@ -131,8 +131,12 @@ public class JobDispatcherLeaderProcessFactoryFactory
             JobGraph jobGraph, Collection<JobResult> dirtyJobResults) {
         final Set<JobID> jobIdsOfFinishedJobs =
                 dirtyJobResults.stream().map(JobResult::getJobId).collect(Collectors.toSet());
-        return jobIdsOfFinishedJobs.contains(jobGraph.getJobID())
-                ? Optional.empty()
-                : Optional.of(jobGraph);
+        if (jobIdsOfFinishedJobs.contains(jobGraph.getJobID())) {
+            LOG.info(
+                    "Skipping recovery of a job with job id {}, because it already reached a globally terminal state",
+                    jobGraph.getJobID());
+            return Optional.empty();
+        }
+        return Optional.of(jobGraph);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
index 63187002acc..5b74536781d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
@@ -148,6 +148,10 @@ public class SessionDispatcherLeaderProcess extends AbstractDispatcherLeaderProc
         for (JobID jobId : jobIds) {
             if (!recoveredDirtyJobResults.contains(jobId)) {
                 tryRecoverJob(jobId).ifPresent(recoveredJobGraphs::add);
+            } else {
+                log.info(
+                        "Skipping recovery of a job with job id {}, because it already reached a globally terminal state",
+                        jobId);
             }
         }