You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/09/12 12:16:58 UTC
[flink] branch release-1.16 updated: [FLINK-29223][coordination] Add missing output info for jobs already reached terminal state
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 3035a8e9747 [FLINK-29223][coordination] Add missing output info for jobs already reached terminal state
3035a8e9747 is described below
commit 3035a8e9747c89b1920746d2b87e34e89ef89066
Author: snuyanzin <sn...@gmail.com>
AuthorDate: Thu Sep 8 09:24:39 2022 +0200
[FLINK-29223][coordination] Add missing output info for jobs already reached terminal state
---
.../runner/JobDispatcherLeaderProcessFactoryFactory.java | 10 +++++++---
.../dispatcher/runner/SessionDispatcherLeaderProcess.java | 4 ++++
2 files changed, 11 insertions(+), 3 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
index 2ebc3bce8fc..8d38825bc58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
@@ -131,8 +131,12 @@ public class JobDispatcherLeaderProcessFactoryFactory
JobGraph jobGraph, Collection<JobResult> dirtyJobResults) {
final Set<JobID> jobIdsOfFinishedJobs =
dirtyJobResults.stream().map(JobResult::getJobId).collect(Collectors.toSet());
- return jobIdsOfFinishedJobs.contains(jobGraph.getJobID())
- ? Optional.empty()
- : Optional.of(jobGraph);
+ if (jobIdsOfFinishedJobs.contains(jobGraph.getJobID())) {
+ LOG.info(
+ "Skipping recovery of a job with job id {}, because it already reached a globally terminal state",
+ jobGraph.getJobID());
+ return Optional.empty();
+ }
+ return Optional.of(jobGraph);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
index 63187002acc..5b74536781d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
@@ -148,6 +148,10 @@ public class SessionDispatcherLeaderProcess extends AbstractDispatcherLeaderProc
for (JobID jobId : jobIds) {
if (!recoveredDirtyJobResults.contains(jobId)) {
tryRecoverJob(jobId).ifPresent(recoveredJobGraphs::add);
+ } else {
+ log.info(
+ "Skipping recovery of a job with job id {}, because it already reached a globally terminal state",
+ jobId);
}
}