You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/05/19 00:29:04 UTC

samza git commit: SAMZA-1712: Tear down ZookeeperServer connections in ZkClient on interrupts.

Repository: samza
Updated Branches:
  refs/heads/master 7a2b4650c -> 72ad7523f


SAMZA-1712: Tear down ZookeeperServer connections in ZkClient on interrupts.

**Problem:**

If a thread executing zkClient.close is interrupted, currently we swallow the ZkInterruptedException and proceed without closing the zookeeper connection.

This leads to ephemeral nodes of StreamProcessor lurking around in zookeeper after StreamProcessor shutdown.

Users had to wait till zookeeper server session timeout for the ephemeral nodes to get deleted.

**Change:**

Retry once on InterruptedException when closing the zkClient.

Misc changes:
* Remove unnecessary null checks.
* Remove unnecessary typecasts.

Author: Shanthoosh Venkataraman <sa...@gmail.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #519 from shanthoosh/handle_interrupted_exception_in_zkclient_close


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/72ad7523
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/72ad7523
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/72ad7523

Branch: refs/heads/master
Commit: 72ad7523fffdcafdc01a0c6922fc94ccd1e482a5
Parents: 7a2b465
Author: Shanthoosh Venkataraman <sa...@gmail.com>
Authored: Fri May 18 17:29:00 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri May 18 17:29:00 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 42 ++++++--------
 .../org/apache/samza/zk/ZkUtilsMetrics.java     |  6 ++
 .../java/org/apache/samza/zk/TestZkUtils.java   | 59 +++++++++++++++++---
 3 files changed, 73 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/72ad7523/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 43f7d9c..6511603 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -78,7 +78,6 @@ public class ZkUtils {
   private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class);
   /* package private */static final String ZK_PROTOCOL_VERSION = "1.0";
 
-
   private final ZkClient zkClient;
   private volatile String ephemeralPath = null;
   private final ZkKeyBuilder keyBuilder;
@@ -105,9 +104,7 @@ public class ZkUtils {
   public void connect() throws ZkInterruptedException {
     boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS);
     if (!isConnected) {
-      if (metrics != null) {
-        metrics.zkConnectionError.inc();
-      }
+      metrics.zkConnectionError.inc();
       throw new RuntimeException("Unable to connect to Zookeeper within connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!");
     }
   }
@@ -144,6 +141,7 @@ public class ZkUtils {
       if (!isValidRegisteredProcessor(processorNode)) {
         LOG.info("Processor: {} is duplicate. Deleting zookeeper node at path: {}.", processorId, ephemeralPath);
         zkClient.delete(ephemeralPath);
+        metrics.deletions.inc();
         throw new SamzaException(String.format("Processor: %s is duplicate in the group. Registration failed.", processorId));
       }
     } else {
@@ -272,16 +270,12 @@ public class ZkUtils {
 
   public void subscribeDataChanges(String path, IZkDataListener dataListener) {
     zkClient.subscribeDataChanges(path, dataListener);
-    if (metrics != null) {
-      metrics.subscriptions.inc();
-    }
+    metrics.subscriptions.inc();
   }
 
   public void subscribeChildChanges(String path, IZkChildListener listener) {
     zkClient.subscribeChildChanges(path, listener);
-    if (metrics != null) {
-      metrics.subscriptions.inc();
-    }
+    metrics.subscriptions.inc();
   }
 
   public void unsubscribeChildChanges(String path, IZkChildListener childListener) {
@@ -290,9 +284,7 @@ public class ZkUtils {
 
   public void writeData(String path, Object object) {
     zkClient.writeData(path, object);
-    if (metrics != null) {
-      metrics.writes.inc();
-    }
+    metrics.writes.inc();
   }
 
   public boolean exists(String path) {
@@ -303,9 +295,10 @@ public class ZkUtils {
     try {
       zkClient.close();
     } catch (ZkInterruptedException e) {
-      // Swallowing due to occurrence in the last stage of lifecycle (Not actionable) and clear the interrupted status.
+      LOG.warn("Interrupted when closing zkClient. Clearing the interrupted status and retrying.", e);
       Thread.interrupted();
-      LOG.warn("Ignoring the exception when closing the zookeeper client.", e);
+      zkClient.close();
+      Thread.currentThread().interrupt();
     }
   }
 
@@ -366,9 +359,7 @@ public class ZkUtils {
   public void subscribeToJobModelVersionChange(GenIZkDataListener dataListener) {
     LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
     zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
-    if (metrics != null) {
-      metrics.subscriptions.inc();
-    }
+    metrics.subscriptions.inc();
   }
 
   /**
@@ -397,7 +388,7 @@ public class ZkUtils {
    * @return job model for this version
    */
   public JobModel getJobModel(String jobModelVersion) {
-    LOG.info("read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
+    LOG.info("Read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
     Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
     metrics.reads.inc();
     ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
@@ -450,7 +441,7 @@ public class ZkUtils {
    */
   public void publishJobModelVersion(String oldVersion, String newVersion) {
     Stat stat = new Stat();
-    String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
+    String currentVersion = zkClient.readData(keyBuilder.getJobModelVersionPath(), stat);
     metrics.reads.inc();
     LOG.info("publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
         .getVersion() + ")");
@@ -491,7 +482,7 @@ public class ZkUtils {
     }
     // if exists, verify the version
     Stat stat = new Stat();
-    String version = (String) zkClient.readData(rootPath, stat);
+    String version = zkClient.readData(rootPath, stat);
     if (version == null) {
       // for backward compatibility, if no value - assume 1.0
       try {
@@ -500,7 +491,7 @@ public class ZkUtils {
         // if the write failed with ZkBadVersionException it means someone else already wrote a version, so we can ignore it.
       }
       // re-read the updated version
-      version = (String) zkClient.readData(rootPath);
+      version = zkClient.readData(rootPath);
     }
     LOG.info("Current version for zk root node: " + rootPath + " is " + version + ", expected version is " + ZK_PROTOCOL_VERSION);
     if (!version.equals(ZK_PROTOCOL_VERSION)) {
@@ -525,7 +516,7 @@ public class ZkUtils {
    * @param listener - will be called when a processor is added or removed.
    */
   public void subscribeToProcessorChange(IZkChildListener listener) {
-    LOG.info("subscribing for child change at:" + keyBuilder.getProcessorsPath());
+    LOG.info("Subscribing for child change at:" + keyBuilder.getProcessorsPath());
     zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
     metrics.subscriptions.inc();
   }
@@ -542,7 +533,7 @@ public class ZkUtils {
   void deleteOldJobModels(int numVersionsToLeave) {
     // read current list of JMs
     String path = keyBuilder.getJobModelPathPrefix();
-    LOG.info("about to delete jm path=" + path);
+    LOG.info("About to delete jm path=" + path);
     List<String> znodeIds = zkClient.getChildren(path);
     deleteOldVersionPath(path, znodeIds, numVersionsToLeave, new Comparator<String>() {
       @Override
@@ -556,7 +547,7 @@ public class ZkUtils {
   void deleteOldBarrierVersions(int numVersionsToLeave) {
     // read current list of barriers
     String path = keyBuilder.getJobModelVersionBarrierPrefix();
-    LOG.info("about to delete old barrier paths from " + path);
+    LOG.info("About to delete old barrier paths from " + path);
     List<String> znodeIds = zkClient.getChildren(path);
     LOG.info("List of all zkNodes: " + znodeIds);
     deleteOldVersionPath(path, znodeIds, numVersionsToLeave,  new Comparator<String>() {
@@ -584,6 +575,7 @@ public class ZkUtils {
         try {
           LOG.info("deleting " + pathToDelete);
           zkClient.deleteRecursive(pathToDelete);
+          metrics.deletions.inc();
         } catch (Exception e) {
           LOG.warn("delete of node " + pathToDelete + " failed.", e);
         }

http://git-wip-us.apache.org/repos/asf/samza/blob/72ad7523/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java
index b9f4aa8..335fbb1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java
@@ -46,10 +46,16 @@ public class ZkUtilsMetrics extends MetricsBase {
    */
   public final Counter zkConnectionError;
 
+  /**
+   * Number of zookeeper data node deletions.
+   */
+  public final Counter deletions;
+
   public ZkUtilsMetrics(MetricsRegistry metricsRegistry) {
     super(metricsRegistry);
     this.reads = newCounter("reads");
     this.writes = newCounter("writes");
+    this.deletions = newCounter("deletions");
     this.subscriptions = newCounter("subscriptions");
     this.zkConnectionError = newCounter("zk-connection-errors");
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/72ad7523/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 1dfb414..ee523aa 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -26,6 +26,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.BooleanSupplier;
 import com.google.common.collect.ImmutableList;
 import org.I0Itec.zkclient.IZkDataListener;
@@ -48,6 +49,7 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 
 public class TestZkUtils {
@@ -62,6 +64,9 @@ public class TestZkUtils {
   // Declared public to honor junit contract.
   public final ExpectedException expectedException = ExpectedException.none();
 
+  @Rule
+  public Timeout testTimeOutInMillis = new Timeout(120000);
+
   @BeforeClass
   public static void setup() throws InterruptedException {
     zkServer = new EmbeddedZookeeper();
@@ -410,15 +415,6 @@ public class TestZkUtils {
 
   }
 
-  @Test
-  public void testCloseShouldNotThrowZkInterruptedExceptionToCaller() {
-    ZkClient zkClient = Mockito.mock(ZkClient.class);
-    ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient,
-            SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
-    Mockito.doThrow(new ZkInterruptedException(new InterruptedException())).when(zkClient).close();
-    zkUtils.close();
-  }
-
   public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
     long delay = startDelayMs;
     while (delay < maxDelayMs) {
@@ -463,4 +459,49 @@ public class TestZkUtils {
     // Get on the JobModel version should return 2, taking into account the published version 2.
     Assert.assertEquals("3", zkUtils.getNextJobModelVersion(zkUtils.getJobModelVersion()));
   }
+
+
+  @Test
+  public void testCloseShouldRetryOnceOnInterruptedException() {
+    ZkClient zkClient = Mockito.mock(ZkClient.class);
+    ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient,
+        SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+
+    Mockito.doThrow(new ZkInterruptedException(new InterruptedException()))
+           .doAnswer(invocation -> null)
+           .when(zkClient).close();
+
+    zkUtils.close();
+
+    Mockito.verify(zkClient, Mockito.times(2)).close();
+  }
+
+  @Test
+  public void testCloseShouldTearDownZkConnectionOnInterruptedException() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    // Establish connection with the zookeeper server.
+    ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.getPort());
+    ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+
+    Thread threadToInterrupt = new Thread(() -> {
+        try {
+          latch.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        zkUtils.close();
+      });
+
+    threadToInterrupt.start();
+
+    Field field = ZkClient.class.getDeclaredField("_closed");
+    field.setAccessible(true);
+
+    Assert.assertFalse(field.getBoolean(zkClient));
+
+    threadToInterrupt.interrupt();
+    threadToInterrupt.join();
+
+    Assert.assertTrue(field.getBoolean(zkClient));
+  }
 }