You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/30 12:41:44 UTC

flink git commit: [FLINK-5197] [jm] Ignore outdated JobStatusChanged messages

Repository: flink
Updated Branches:
  refs/heads/release-1.1 0758d0be6 -> 569a9666f


[FLINK-5197] [jm] Ignore outdated JobStatusChanged messages

Outdated JobStatusChanged messages no longer trigger a RemoveJob message but are
logged and ignored. This has the advantage, that an outdated JobStatusChanged message
cannot interfere with a recovered job which can have the same job id.

This closes #2896.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/569a9666
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/569a9666
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/569a9666

Branch: refs/heads/release-1.1
Commit: 569a9666fca9d9113d9fc7f0382faf986afb036f
Parents: 0758d0b
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 29 16:02:29 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 30 13:41:05 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/jobmanager/JobManager.scala | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/569a9666/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4a4968f..cf60d4e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -746,7 +746,7 @@ class JobManager(
         }
       }(context.dispatcher)
 
-    case JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
+    case msg @ JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
 
@@ -818,8 +818,7 @@ class JobManager(
               }
             }(context.dispatcher)
           }
-        case None =>
-          self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
+        case None => log.debug(s"Received $msg for nonexistent job $jobID.")
       }
 
     case ScheduleOrUpdateConsumers(jobId, partitionId) =>
@@ -956,7 +955,7 @@ class JobManager(
                 futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete)
               case None =>
             }
-        case None =>
+        case None => log.debug(s"Tried to remove nonexistent job $jobID.")
       }
 
     case RemoveCachedJob(jobID) =>
@@ -1620,7 +1619,7 @@ class JobManager(
               // shutdown to release all resources.
               submittedJobGraphs.removeJobGraph(jobID)
             } catch {
-              case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t)
+              case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
             }
           }(context.dispatcher))
 
@@ -1629,7 +1628,7 @@ class JobManager(
 
             archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
           } catch {
-            case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
+            case t: Throwable => log.warn(s"Could not prepare the execution graph $eg for " +
               "archiving.", t)
           }