You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/08/03 21:32:20 UTC
samza git commit: SAMZA-1365: Calling zkClient.close from zkWatch
impl blocks indefinitely.
Repository: samza
Updated Branches:
refs/heads/master fde556440 -> 0a4ecb232
SAMZA-1365: Calling zkClient.close from zkWatch impl blocks indefinitely.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Navina Ramesh <na...@apache.org>
Closes #253 from shanthoosh/SAMZA-1365
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0a4ecb23
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0a4ecb23
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0a4ecb23
Branch: refs/heads/master
Commit: 0a4ecb232a4fec0d083ecc065a0ad8d4518600ae
Parents: fde5564
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Thu Aug 3 14:32:09 2017 -0700
Committer: navina <na...@apache.org>
Committed: Thu Aug 3 14:32:09 2017 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/samza/zk/ZkJobCoordinator.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/0a4ecb23/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 2204240..9f64b3a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -333,6 +333,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
/// listener to handle session expiration
class ZkSessionStateChangedListener implements IZkStateListener {
+ private static final String ZK_SESSION_ERROR = "ZK_SESSION_ERROR";
+
@Override
public void handleStateChanged(Watcher.Event.KeeperState state)
throws Exception {
@@ -367,7 +369,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
throws Exception {
// this means we cannot connect to zookeeper
LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
- stop();
+ debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
}
}