You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/09 13:57:03 UTC
flink git commit: [hotfix] Move logging of JobStatus changes into the
ExecutionGraph
Repository: flink
Updated Branches:
refs/heads/master ccf35cf20 -> 7a5189525
[hotfix] Move logging of JobStatus changes into the ExecutionGraph
Prior the JobManager was responsible for logging the JobStatus changes. This introduced
out of order logging since the JM was a mere job status listener which was notified by
an asynchronous message.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a518952
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a518952
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a518952
Branch: refs/heads/master
Commit: 7a518952512238099ea8350156d38cfdbe9871ea
Parents: ccf35cf
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 9 12:19:10 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 9 12:19:10 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/executiongraph/ExecutionGraph.java | 14 ++++++--------
.../apache/flink/runtime/jobmanager/JobManager.scala | 4 ----
2 files changed, 6 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7a518952/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8a4f3ef..1231b45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -970,9 +970,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
if (STATE_UPDATER.compareAndSet(this, current, newState)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} switched from {} to {}.", this.getJobName(), current, newState);
- }
+ LOG.info("Job {} ({}) switched from state {} to {}.", jobName, jobID, current, newState, error);
stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
notifyJobStatusChange(newState, error);
@@ -1051,20 +1049,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
synchronized (progressLock) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Try to restart the job or fail it if no longer possible.", failureCause);
+ LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", jobName, jobID, failureCause);
} else {
- LOG.info("Try to restart the job or fail it if no longer possible.");
+ LOG.info("Try to restart or fail the job {} ({}) if no longer possible.", jobName, jobID);
}
boolean isRestartable = !(failureCause instanceof SuppressRestartsException) && restartStrategy.canRestart();
if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
- LOG.info("Restarting the job...");
+ LOG.info("Restarting the job {} ({}).", jobName, jobID);
restartStrategy.restart(this);
return true;
} else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
- LOG.info("Could not restart the job.", failureCause);
+ LOG.info("Could not restart the job {} ({}).", jobName, jobID, failureCause);
postRunCleanup();
return true;
@@ -1195,7 +1193,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
if (execution != null) {
execution.setAccumulators(userAccumulators);
} else {
- LOG.warn("Received accumulator result for unknown execution {}.", execID);
+ LOG.debug("Received accumulator result for unknown execution {}.", execID);
}
} catch (Exception e) {
LOG.error("Cannot update accumulators for job {}.", jobID, e);
http://git-wip-us.apache.org/repos/asf/flink/blob/7a518952/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 31f9dd7..9af5355 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -842,10 +842,6 @@ class JobManager(
currentJobs.get(jobID) match {
case Some((executionGraph, jobInfo)) => executionGraph.getJobName
- log.info(
- s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus.",
- error)
-
if (newJobStatus.isGloballyTerminalState()) {
jobInfo.end = timeStamp