You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/30 12:41:44 UTC
flink git commit: [FLINK-5197] [jm] Ignore outdated JobStatusChanged
messages
Repository: flink
Updated Branches:
refs/heads/release-1.1 0758d0be6 -> 569a9666f
[FLINK-5197] [jm] Ignore outdated JobStatusChanged messages
Outdated JobStatusChanged messages no longer trigger a RemoveJob message but are
logged and ignored. This has the advantage, that an outdated JobStatusChanged message
cannot interfere with a recovered job which can have the same job id.
This closes #2896.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/569a9666
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/569a9666
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/569a9666
Branch: refs/heads/release-1.1
Commit: 569a9666fca9d9113d9fc7f0382faf986afb036f
Parents: 0758d0b
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 29 16:02:29 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 30 13:41:05 2016 +0100
----------------------------------------------------------------------
.../org/apache/flink/runtime/jobmanager/JobManager.scala | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/569a9666/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4a4968f..cf60d4e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -746,7 +746,7 @@ class JobManager(
}
}(context.dispatcher)
- case JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
+ case msg @ JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
currentJobs.get(jobID) match {
case Some((executionGraph, jobInfo)) => executionGraph.getJobName
@@ -818,8 +818,7 @@ class JobManager(
}
}(context.dispatcher)
}
- case None =>
- self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
+ case None => log.debug(s"Received $msg for nonexistent job $jobID.")
}
case ScheduleOrUpdateConsumers(jobId, partitionId) =>
@@ -956,7 +955,7 @@ class JobManager(
futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete)
case None =>
}
- case None =>
+ case None => log.debug(s"Tried to remove nonexistent job $jobID.")
}
case RemoveCachedJob(jobID) =>
@@ -1620,7 +1619,7 @@ class JobManager(
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
} catch {
- case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t)
+ case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
}
}(context.dispatcher))
@@ -1629,7 +1628,7 @@ class JobManager(
archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
} catch {
- case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
+ case t: Throwable => log.warn(s"Could not prepare the execution graph $eg for " +
"archiving.", t)
}