You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/07/15 19:39:01 UTC
samza git commit: SAMZA-728 : Samza job fails due to null pointer in
JobCoordinator refreshJobModel
Repository: samza
Updated Branches:
refs/heads/master 6cd76a5d5 -> 3d1fd2171
SAMZA-728 : Samza job fails due to null pointer in JobCoordinator refreshJobModel
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3d1fd217
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3d1fd217
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3d1fd217
Branch: refs/heads/master
Commit: 3d1fd2171e5f426c15fff457d61d952860b3298e
Parents: 6cd76a5
Author: Navina <na...@gmail.com>
Authored: Wed Jul 15 10:38:29 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Wed Jul 15 10:38:29 2015 -0700
----------------------------------------------------------------------
.../org/apache/samza/coordinator/JobCoordinator.scala | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/3d1fd217/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 8ee034a..73c58a7 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -247,7 +247,15 @@ object JobCoordinator extends Logging {
{ case (taskName, systemStreamPartitions) =>
val checkpoint = Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new Checkpoint(new util.HashMap[SystemStreamPartition, String]()))
// Find the system partitions which don't have a checkpoint and set null for the values for offsets
- val offsetMap = systemStreamPartitions.map(ssp => (ssp -> null)).toMap ++ checkpoint.getOffsets
+ val taskOffsets = checkpoint.getOffsets
+ val offsetMap = new util.HashMap[SystemStreamPartition, String]()
+ systemStreamPartitions.foreach {
+ ssp =>
+ if(taskOffsets.containsKey(ssp))
+ offsetMap.put(ssp, taskOffsets.get(ssp))
+ else
+ offsetMap.put(ssp, null)
+ }
val changelogPartition = Option(previousChangelogMapping.get(taskName)) match
{
case Some(changelogPartitionId) => new Partition(changelogPartitionId)