You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/20 12:45:58 UTC
flink git commit: [FLINK-5585] [jobmanager] Fix NullPointerException
in JobManager.updateAccumulators
Repository: flink
Updated Branches:
refs/heads/release-1.1 931929bf8 -> f6f1c244c
[FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6f1c244
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6f1c244
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6f1c244
Branch: refs/heads/release-1.1
Commit: f6f1c244cf149d451a32fb3231a6bf1168bc31d1
Parents: 931929b
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jan 20 11:12:12 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 20 11:14:35 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/jobmanager/JobManager.scala | 23 ++++++++++----------
1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f6f1c244/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d6d23d9..1720d94 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1784,18 +1784,19 @@ class JobManager(
*
* @param accumulators list of accumulator snapshots
*/
- private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
- accumulators foreach {
- case accumulatorEvent =>
- currentJobs.get(accumulatorEvent.getJobID) match {
- case Some((jobGraph, jobInfo)) =>
- future {
- jobGraph.updateAccumulators(accumulatorEvent)
- }(context.dispatcher)
- case None =>
- // ignore accumulator values for old job
+ private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]): Unit = {
+ accumulators.foreach( snapshot => {
+ if (snapshot != null) {
+ currentJobs.get(snapshot.getJobID) match {
+ case Some((jobGraph, jobInfo)) =>
+ future {
+ jobGraph.updateAccumulators(snapshot)
+ }(context.dispatcher)
+ case None =>
+ // ignore accumulator values for old job
+ }
}
- }
+ })
}
/**