You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/09 13:57:03 UTC

flink git commit: [hotfix] Move logging of JobStatus changes into the ExecutionGraph

Repository: flink
Updated Branches:
  refs/heads/master ccf35cf20 -> 7a5189525


[hotfix] Move logging of JobStatus changes into the ExecutionGraph

Prior the JobManager was responsible for logging the JobStatus changes. This introduced
out of order logging since the JM was a mere job status listener which was notified by
an asynchronous message.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a518952
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a518952
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a518952

Branch: refs/heads/master
Commit: 7a518952512238099ea8350156d38cfdbe9871ea
Parents: ccf35cf
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 9 12:19:10 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 9 12:19:10 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/ExecutionGraph.java  | 14 ++++++--------
 .../apache/flink/runtime/jobmanager/JobManager.scala  |  4 ----
 2 files changed, 6 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a518952/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8a4f3ef..1231b45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -970,9 +970,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
 		if (STATE_UPDATER.compareAndSet(this, current, newState)) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("{} switched from {} to {}.", this.getJobName(), current, newState);
-			}
+			LOG.info("Job {} ({}) switched from state {} to {}.", jobName, jobID, current, newState, error);
 
 			stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
 			notifyJobStatusChange(newState, error);
@@ -1051,20 +1049,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
 			synchronized (progressLock) {
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Try to restart the job or fail it if no longer possible.", failureCause);
+					LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", jobName, jobID, failureCause);
 				} else {
-					LOG.info("Try to restart the job or fail it if no longer possible.");
+					LOG.info("Try to restart or fail the job {} ({}) if no longer possible.", jobName, jobID);
 				}
 
 				boolean isRestartable = !(failureCause instanceof SuppressRestartsException) && restartStrategy.canRestart();
 
 				if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
-					LOG.info("Restarting the job...");
+					LOG.info("Restarting the job {} ({}).", jobName, jobID);
 					restartStrategy.restart(this);
 
 					return true;
 				} else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
-					LOG.info("Could not restart the job.", failureCause);
+					LOG.info("Could not restart the job {} ({}).", jobName, jobID, failureCause);
 					postRunCleanup();
 
 					return true;
@@ -1195,7 +1193,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			if (execution != null) {
 				execution.setAccumulators(userAccumulators);
 			} else {
-				LOG.warn("Received accumulator result for unknown execution {}.", execID);
+				LOG.debug("Received accumulator result for unknown execution {}.", execID);
 			}
 		} catch (Exception e) {
 			LOG.error("Cannot update accumulators for job {}.", jobID, e);

http://git-wip-us.apache.org/repos/asf/flink/blob/7a518952/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 31f9dd7..9af5355 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -842,10 +842,6 @@ class JobManager(
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
 
-          log.info(
-            s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus.",
-            error)
-
           if (newJobStatus.isGloballyTerminalState()) {
             jobInfo.end = timeStamp