You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/07/15 19:39:01 UTC

samza git commit: SAMZA-728 : Samza job fails due to null pointer in JobCoordinator refreshJobModel

Repository: samza
Updated Branches:
  refs/heads/master 6cd76a5d5 -> 3d1fd2171


SAMZA-728 : Samza job fails due to null pointer in JobCoordinator refreshJobModel


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3d1fd217
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3d1fd217
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3d1fd217

Branch: refs/heads/master
Commit: 3d1fd2171e5f426c15fff457d61d952860b3298e
Parents: 6cd76a5
Author: Navina <na...@gmail.com>
Authored: Wed Jul 15 10:38:29 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Wed Jul 15 10:38:29 2015 -0700

----------------------------------------------------------------------
 .../org/apache/samza/coordinator/JobCoordinator.scala     | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3d1fd217/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 8ee034a..73c58a7 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -247,7 +247,15 @@ object JobCoordinator extends Logging {
                 { case (taskName, systemStreamPartitions) =>
                   val checkpoint = Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new Checkpoint(new util.HashMap[SystemStreamPartition, String]()))
                   // Find the system partitions which don't have a checkpoint and set null for the values for offsets
-                  val offsetMap = systemStreamPartitions.map(ssp => (ssp -> null)).toMap ++ checkpoint.getOffsets
+                  val taskOffsets = checkpoint.getOffsets
+                  val offsetMap = new util.HashMap[SystemStreamPartition, String]()
+                  systemStreamPartitions.foreach {
+                    ssp =>
+                      if(taskOffsets.containsKey(ssp))
+                        offsetMap.put(ssp, taskOffsets.get(ssp))
+                      else
+                        offsetMap.put(ssp, null)
+                  }
                   val changelogPartition = Option(previousChangelogMapping.get(taskName)) match
                   {
                     case Some(changelogPartitionId) => new Partition(changelogPartitionId)