You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/10/24 23:38:40 UTC

git commit: SAMZA-445; close checkpoint manager in util after changelog partitions are written

Repository: incubator-samza
Updated Branches:
  refs/heads/0.8.0 f484a7076 -> 5862f1391


SAMZA-445; close checkpoint manager in util after changelog partitions are written


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5862f139
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5862f139
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5862f139

Branch: refs/heads/0.8.0
Commit: 5862f13911891f5566417622713b5e87057261dd
Parents: f484a70
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Fri Oct 24 14:38:29 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Fri Oct 24 14:38:29 2014 -0700

----------------------------------------------------------------------
 samza-core/src/main/scala/org/apache/samza/util/Util.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5862f139/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 7d50352..1c7680f 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -281,8 +281,6 @@ object Util extends Logging {
         fromCM.map(kv => kv._1 -> kv._2.intValue()).toMap // Java to Scala interop!!!
       }
 
-      checkpointManager.stop
-
       val newMapping = Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, previousMapping)
 
       if (newMapping != null) {
@@ -290,6 +288,8 @@ object Util extends Logging {
         checkpointManager.writeChangeLogPartitionMapping(newMapping.map(kv => kv._1 -> new java.lang.Integer(kv._2))) //Java to Scala interop!!!
       }
 
+      checkpointManager.stop
+
       newMapping
     }
   }