You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/08/09 17:50:58 UTC

[12/23] samza git commit: Samza-1364: Handle ZKExceptions in zkCoordinationUtils.reset.

Samza-1364: Handle ZKExceptions in zkCoordinationUtils.reset.

In some cases LocalAppRunner.waitForFinish indefinitely blocks after LocalApplicationRunner.kill. Last step in LocalAppRunner.kill(streamApp) is zkClient.close()[zkClient belongs to ZkCoordinationService].

ApplicationRunner.kill triggers listeners chain and in final listener zkClient.close throws ZkInterruptedException(RuntimeException) & it's swallowed in listeners preventing shutdownLatch update in LocalApplicationRunner(required for proper shutdown).

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Boris Shkolnik <bo...@apache.org>, Navina Ramesh <na...@apache.org>

Closes #246 from shanthoosh/SAMZA-1364


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d32e8bb3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d32e8bb3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d32e8bb3

Branch: refs/heads/0.14.0
Commit: d32e8bb3a64db5ffdcded3b299e10755e8679e0f
Parents: 95d96b9
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Tue Jul 25 17:29:08 2017 -0700
Committer: navina <na...@apache.org>
Committed: Tue Jul 25 17:29:08 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/zk/ZkCoordinationUtils.java   | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d32e8bb3/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index df0a527..f5dda2e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -18,13 +18,18 @@
  */
 package org.apache.samza.zk;
 
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.coordinator.LeaderElector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ZkCoordinationUtils implements CoordinationUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationUtils.class);
+
   public final ZkConfig zkConfig;
   public final ZkUtils zkUtils;
   public final String processorIdStr;
@@ -37,7 +42,12 @@ public class ZkCoordinationUtils implements CoordinationUtils {
 
   @Override
   public void reset() {
-    zkUtils.close();
+    try {
+      zkUtils.close();
+    } catch (ZkInterruptedException ex) {
+      // Swallowing due to occurrence in the last stage of lifecycle(Not actionable).
+      LOG.error("Exception in reset: ", ex);
+    }
   }
 
   @Override