You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/05/04 00:03:16 UTC

samza git commit: SAMZA-1257: make sure a dataChange listener in LeaderElection is always initialized.

Repository: samza
Updated Branches:
  refs/heads/master 475b4654c -> 8bcfded46


SAMZA-1257: make sure a dataChange listener in LeaderElection is always initialized.

xinyuiscool, navina please review.

Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Navina Ramesh <na...@apache.org>

Closes #156 from sborya/SAMZA-1257


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8bcfded4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8bcfded4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8bcfded4

Branch: refs/heads/master
Commit: 8bcfded46a4617cec6d98bc1db79fa13978916bb
Parents: 475b465
Author: Boris Shkolnik <bo...@apache.org>
Authored: Wed May 3 17:03:08 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Wed May 3 17:03:08 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/samza/zk/ZkLeaderElector.java    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8bcfded4/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 644864a..4ffe3e4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -105,17 +105,17 @@ public class ZkLeaderElector implements LeaderElector {
     LOG.info("Index = " + index + " Not eligible to be a leader yet!");
     String predecessor = children.get(index - 1);
     if (!predecessor.equals(currentSubscription)) {
-      if (currentSubscription != null) {
-
-        // callback in case if the previous node gets deleted (when previous processor dies)
-        if (previousProcessorChangeListener == null)
-          previousProcessorChangeListener =  new PreviousProcessorChangeListener(leaderElectorListener);
 
+      if (currentSubscription != null) {
         LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
         zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
             previousProcessorChangeListener);
       }
       currentSubscription = predecessor;
+      // callback in case if the previous node gets deleted (when previous processor dies)
+      if (previousProcessorChangeListener == null)
+        previousProcessorChangeListener =  new PreviousProcessorChangeListener(leaderElectorListener);
+
       LOG.info(zLog("Subscribing data change for " + predecessor));
       zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
           previousProcessorChangeListener);