You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/08/09 17:50:58 UTC
[12/23] samza git commit: Samza-1364: Handle ZKExceptions in
zkCoordinationUtils.reset.
Samza-1364: Handle ZKExceptions in zkCoordinationUtils.reset.
In some cases LocalAppRunner.waitForFinish indefinitely blocks after LocalApplicationRunner.kill. Last step in LocalAppRunner.kill(streamApp) is zkClient.close()[zkClient belongs to ZkCoordinationService].
ApplicationRunner.kill triggers listeners chain and in final listener zkClient.close throws ZkInterruptedException(RuntimeException) & it's swallowed in listeners preventing shutdownLatch update in LocalApplicationRunner(required for proper shutdown).
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Boris Shkolnik <bo...@apache.org>, Navina Ramesh <na...@apache.org>
Closes #246 from shanthoosh/SAMZA-1364
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d32e8bb3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d32e8bb3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d32e8bb3
Branch: refs/heads/0.14.0
Commit: d32e8bb3a64db5ffdcded3b299e10755e8679e0f
Parents: 95d96b9
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Tue Jul 25 17:29:08 2017 -0700
Committer: navina <na...@apache.org>
Committed: Tue Jul 25 17:29:08 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/samza/zk/ZkCoordinationUtils.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/d32e8bb3/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index df0a527..f5dda2e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -18,13 +18,18 @@
*/
package org.apache.samza.zk;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.Latch;
import org.apache.samza.coordinator.LeaderElector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ZkCoordinationUtils implements CoordinationUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationUtils.class);
+
public final ZkConfig zkConfig;
public final ZkUtils zkUtils;
public final String processorIdStr;
@@ -37,7 +42,12 @@ public class ZkCoordinationUtils implements CoordinationUtils {
@Override
public void reset() {
- zkUtils.close();
+ try {
+ zkUtils.close();
+ } catch (ZkInterruptedException ex) {
+ // Swallowing due to occurrence in the last stage of lifecycle(Not actionable).
+ LOG.error("Exception in reset: ", ex);
+ }
}
@Override