You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/05/04 00:03:16 UTC
samza git commit: SAMZA-1257: make sure a dataChange listener in
LeaderElection is always initialized.
Repository: samza
Updated Branches:
refs/heads/master 475b4654c -> 8bcfded46
SAMZA-1257: make sure a dataChange listener in LeaderElection is always initialized.
xinyuiscool, navina please review.
Author: Boris Shkolnik <bo...@apache.org>
Reviewers: Navina Ramesh <na...@apache.org>
Closes #156 from sborya/SAMZA-1257
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8bcfded4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8bcfded4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8bcfded4
Branch: refs/heads/master
Commit: 8bcfded46a4617cec6d98bc1db79fa13978916bb
Parents: 475b465
Author: Boris Shkolnik <bo...@apache.org>
Authored: Wed May 3 17:03:08 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Wed May 3 17:03:08 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/samza/zk/ZkLeaderElector.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/8bcfded4/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 644864a..4ffe3e4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -105,17 +105,17 @@ public class ZkLeaderElector implements LeaderElector {
LOG.info("Index = " + index + " Not eligible to be a leader yet!");
String predecessor = children.get(index - 1);
if (!predecessor.equals(currentSubscription)) {
- if (currentSubscription != null) {
-
- // callback in case if the previous node gets deleted (when previous processor dies)
- if (previousProcessorChangeListener == null)
- previousProcessorChangeListener = new PreviousProcessorChangeListener(leaderElectorListener);
+ if (currentSubscription != null) {
LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
previousProcessorChangeListener);
}
currentSubscription = predecessor;
+ // callback in case if the previous node gets deleted (when previous processor dies)
+ if (previousProcessorChangeListener == null)
+ previousProcessorChangeListener = new PreviousProcessorChangeListener(leaderElectorListener);
+
LOG.info(zLog("Subscribing data change for " + predecessor));
zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
previousProcessorChangeListener);