You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/08/03 21:32:20 UTC

samza git commit: SAMZA-1365: Calling zkClient.close from zkWatch impl blocks indefinitely.

Repository: samza
Updated Branches:
  refs/heads/master fde556440 -> 0a4ecb232


SAMZA-1365: Calling zkClient.close from zkWatch impl blocks indefinitely.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Navina Ramesh <na...@apache.org>

Closes #253 from shanthoosh/SAMZA-1365


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0a4ecb23
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0a4ecb23
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0a4ecb23

Branch: refs/heads/master
Commit: 0a4ecb232a4fec0d083ecc065a0ad8d4518600ae
Parents: fde5564
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Thu Aug 3 14:32:09 2017 -0700
Committer: navina <na...@apache.org>
Committed: Thu Aug 3 14:32:09 2017 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/samza/zk/ZkJobCoordinator.java      | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0a4ecb23/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 2204240..9f64b3a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -333,6 +333,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   /// listener to handle session expiration
   class ZkSessionStateChangedListener implements IZkStateListener {
 
+    private static final String ZK_SESSION_ERROR = "ZK_SESSION_ERROR";
+
     @Override
     public void handleStateChanged(Watcher.Event.KeeperState state)
         throws Exception {
@@ -367,7 +369,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         throws Exception {
       // this means we cannot connect to zookeeper
       LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
-      stop();
+      debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
     }
   }