You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/20 12:45:58 UTC

flink git commit: [FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators

Repository: flink
Updated Branches:
  refs/heads/release-1.1 931929bf8 -> f6f1c244c


[FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6f1c244
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6f1c244
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6f1c244

Branch: refs/heads/release-1.1
Commit: f6f1c244cf149d451a32fb3231a6bf1168bc31d1
Parents: 931929b
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jan 20 11:12:12 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 20 11:14:35 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 23 ++++++++++----------
 1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f6f1c244/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d6d23d9..1720d94 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1784,18 +1784,19 @@ class JobManager(
    *
    * @param accumulators list of accumulator snapshots
    */
-  private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
-    accumulators foreach {
-      case accumulatorEvent =>
-        currentJobs.get(accumulatorEvent.getJobID) match {
-          case Some((jobGraph, jobInfo)) =>
-            future {
-              jobGraph.updateAccumulators(accumulatorEvent)
-            }(context.dispatcher)
-          case None =>
-          // ignore accumulator values for old job
+  private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]): Unit = {
+    accumulators.foreach( snapshot => {
+        if (snapshot != null) {
+          currentJobs.get(snapshot.getJobID) match {
+            case Some((jobGraph, jobInfo)) =>
+              future {
+                jobGraph.updateAccumulators(snapshot)
+              }(context.dispatcher)
+            case None =>
+              // ignore accumulator values for old job
+          }
         }
-    }
+    })
   }
 
   /**