You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/10/04 20:21:46 UTC

git commit: [HELIX-264] fix zkclient#close() bug, rb=14483

Updated Branches:
  refs/heads/master 52717e40d -> b9fe73879


[HELIX-264] fix zkclient#close() bug, rb=14483


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/b9fe7387
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/b9fe7387
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/b9fe7387

Branch: refs/heads/master
Commit: b9fe738797cd5228e8ecaa284c8874bfa19f1ff2
Parents: 52717e4
Author: zzhang <zz...@apache.org>
Authored: Fri Oct 4 11:20:17 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri Oct 4 11:20:17 2013 -0700

----------------------------------------------------------------------
 .../org/apache/helix/manager/zk/ZkClient.java   |  44 ++-
 .../java/org/apache/helix/ZkTestHelper.java     |  46 +++-
 .../apache/helix/manager/zk/TestZkFlapping.java | 272 +++++++++++++++++++
 3 files changed, 349 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b9fe7387/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 5b3af6d..5f58f1b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -62,8 +62,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
     _zkSerializer = zkSerializer;
     if (LOG.isTraceEnabled()) {
       StackTraceElement[] calls = Thread.currentThread().getStackTrace();
-      int min = Math.min(calls.length, 5);
-      LOG.trace("creating a zkclient. callstack: " + Arrays.asList(calls).subList(0, min));
+      LOG.trace("creating a zkclient. callstack: " + Arrays.asList(calls));
     }
   }
 
@@ -122,10 +121,45 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
   public void close() throws ZkInterruptedException {
     if (LOG.isTraceEnabled()) {
       StackTraceElement[] calls = Thread.currentThread().getStackTrace();
-      int min = Math.min(calls.length, 5);
-      LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls).subList(0, min));
+      LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls));
+    }
+
+    getEventLock().lock();
+    try {
+      if (_connection == null) {
+        return;
+      }
+
+      LOG.info("Closing zkclient: " + ((ZkConnection) _connection).getZookeeper());
+      super.close();
+    } catch (ZkInterruptedException e) {
+      /**
+       * HELIX-264: calling ZkClient#close() in its own eventThread context will
+       * throw ZkInterruptedException and skip ZkConnection#close()
+       */
+      if (_connection != null) {
+        try {
+          /**
+           * ZkInterruptedException#construct() honors InterruptedException by calling
+           * Thread.currentThread().interrupt(); clear it first, so we can safely close the
+           * zk-connection
+           */
+          Thread.interrupted();
+          _connection.close();
+          _connection = null;
+
+          /**
+           * restore interrupted status of current thread
+           */
+          Thread.currentThread().interrupt();
+        } catch (InterruptedException e1) {
+          throw new ZkInterruptedException(e1);
+        }
+      }
+    } finally {
+      getEventLock().unlock();
+      LOG.info("Closed zkclient");
     }
-    super.close();
   }
 
   public Stat getStat(final String path) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b9fe7387/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index db2a6d0..a43bba1 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -20,6 +20,7 @@ package org.apache.helix;
  */
 
 import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.net.Socket;
@@ -43,9 +44,11 @@ import org.apache.helix.model.ExternalView;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
+import org.testng.Assert;
 
 public class ZkTestHelper {
   private static Logger LOG = Logger.getLogger(ZkTestHelper.class);
@@ -102,13 +105,37 @@ public class ZkTestHelper {
     LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
   }
 
+  /**
+   * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
+   */
+  public static void simulateZkStateDisconnected(ZkClient client) {
+    WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
+    client.process(event);
+  }
+
+  /**
+   * Get zk connection session id
+   * @param client
+   * @return
+   */
+  public static String getSessionId(ZkClient client) {
+    ZkConnection connection = ((ZkConnection) client.getConnection());
+    ZooKeeper curZookeeper = connection.getZookeeper();
+    return Long.toHexString(curZookeeper.getSessionId());
+  }
+
+  /**
+   * Expire current zk session and wait for {@link IZkStateListener#handleNewSession()} invoked
+   * @param zkClient
+   * @throws Exception
+   */
   public static void expireSession(final ZkClient zkClient) throws Exception {
-    final CountDownLatch waitExpire = new CountDownLatch(1);
+    final CountDownLatch waitNewSession = new CountDownLatch(1);
 
     IZkStateListener listener = new IZkStateListener() {
       @Override
       public void handleStateChanged(KeeperState state) throws Exception {
-        // System.err.println("handleStateChanged. state: " + state);
+        LOG.info("IZkStateListener#handleStateChanged, state: " + state);
       }
 
       @Override
@@ -120,7 +147,7 @@ public class ZkTestHelper {
         ZooKeeper curZookeeper = connection.getZookeeper();
 
         LOG.info("handleNewSession. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
-        waitExpire.countDown();
+        waitNewSession.countDown();
       }
     };
 
@@ -128,12 +155,13 @@ public class ZkTestHelper {
 
     ZkConnection connection = ((ZkConnection) zkClient.getConnection());
     ZooKeeper curZookeeper = connection.getZookeeper();
-    LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+    String oldSessionId = Long.toHexString(curZookeeper.getSessionId());
+    LOG.info("Before session expiry. sessionId: " + oldSessionId + ", zk: " + curZookeeper);
 
     Watcher watcher = new Watcher() {
       @Override
       public void process(WatchedEvent event) {
-        LOG.info("Process watchEvent: " + event);
+        LOG.info("Watcher#process, event: " + event);
       }
     };
 
@@ -144,17 +172,19 @@ public class ZkTestHelper {
     while (dupZookeeper.getState() != States.CONNECTED) {
       Thread.sleep(10);
     }
+    Assert.assertEquals(dupZookeeper.getState(), States.CONNECTED, "Fail to connect to zk using current session info");
     dupZookeeper.close();
 
     // make sure session expiry really happens
-    waitExpire.await();
+    waitNewSession.await();
     zkClient.unsubscribeStateChanges(listener);
 
     connection = (ZkConnection) zkClient.getConnection();
     curZookeeper = connection.getZookeeper();
 
-    // System.err.println("zk: " + oldZookeeper);
-    LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+    String newSessionId = Long.toHexString(curZookeeper.getSessionId());
+    LOG.info("After session expiry. sessionId: " + newSessionId + ", zk: " + curZookeeper);
+    Assert.assertNotSame(newSessionId, oldSessionId, "Fail to expire current session, zk: " + curZookeeper);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b9fe7387/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
new file mode 100644
index 0000000..5b35148
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
@@ -0,0 +1,272 @@
+package org.apache.helix.manager.zk;
+
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.Verifier;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.LiveInstance;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkFlapping extends ZkUnitTestBase {
+
+  @Test
+  public void testZkSessionExpiry() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ZkClient client =
+        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+
+    String path = String.format("/%s", clusterName);
+    client.createEphemeral(path);
+    String oldSessionId = ZkTestHelper.getSessionId(client);
+    ZkTestHelper.expireSession(client);
+    String newSessionId = ZkTestHelper.getSessionId(client);
+    Assert.assertNotSame(newSessionId, oldSessionId);
+    Assert.assertFalse(client.exists(path), "Ephemeral znode should be gone after session expiry");
+    client.close();
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testCloseZkClient() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ZkClient client =
+        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+    String path = String.format("/%s", clusterName);
+    client.createEphemeral(path);
+
+    client.close();
+    Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+        + " should be removed after ZkClient#close()");
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testCloseZkClientInZkClientEventThread() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    final CountDownLatch waitCallback = new CountDownLatch(1);
+    final ZkClient client =
+        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+    String path = String.format("/%s", clusterName);
+    client.createEphemeral(path);
+    client.subscribeDataChanges(path, new IZkDataListener() {
+
+      @Override
+      public void handleDataDeleted(String dataPath) throws Exception {
+      }
+
+      @Override
+      public void handleDataChange(String dataPath, Object data) throws Exception {
+        client.close();
+        waitCallback.countDown();
+      }
+    });
+
+    client.writeData(path, new ZNRecord("test"));
+    waitCallback.await();
+    Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+        + " should be removed after ZkClient#close() in its own event-thread");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+
+  class ZkStateCountListener implements IZkStateListener {
+    int count = 0;
+
+    @Override
+    public void handleStateChanged(KeeperState state) throws Exception {
+      if (state == KeeperState.Disconnected) {
+        count++;
+      }
+    }
+
+    @Override
+    public void handleNewSession() throws Exception {
+    }
+  }
+
+  @Test
+  public void testParticipantFlapping() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        32, // partitions per resource
+        1, // number of nodes
+        1, // replicas
+        "MasterSlave", false);
+
+    final String instanceName = "localhost_12918";
+    MockParticipant participant = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+    participant.syncStart();
+
+    final ZkClient client = participant.getManager().getZkClient();
+    final ZkStateCountListener listener = new ZkStateCountListener();
+    client.subscribeStateChanges(listener);
+
+    final AtomicInteger expectDisconnectCnt = new AtomicInteger(0);
+    final int n = ZKHelixManager.MAX_DISCONNECT_THRESHOLD;
+    for (int i = 0; i < n; i++) {
+      String oldSessionId = ZkTestHelper.getSessionId(client);
+      ZkTestHelper.simulateZkStateDisconnected(client);
+      expectDisconnectCnt.incrementAndGet();
+      // wait until we get invoked by zk state change to disconnected
+      TestHelper.verify(new Verifier() {
+
+        @Override
+        public boolean verify() throws Exception {
+          return listener.count == expectDisconnectCnt.get();
+        }
+      }, 30 * 1000);
+
+      String newSessionId = ZkTestHelper.getSessionId(client);
+      Assert.assertEquals(newSessionId, oldSessionId);
+    }
+    client.unsubscribeStateChanges(listener);
+    // make sure participant is NOT disconnected
+    LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+    Assert.assertNotNull(liveInstance, "Live-instance should exist after " + n + " disconnects");
+
+    // trigger flapping
+    ZkTestHelper.simulateZkStateDisconnected(client);
+    // wait until we get invoked by zk state change to disconnected
+    boolean success = TestHelper.verify(new Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        return client.getShutdownTrigger();
+      }
+    }, 30 * 1000);
+
+    Assert.assertTrue(success, "The " + (n + 1)
+        + "th disconnect event should trigger ZkHelixManager#disonnect");
+
+    // make sure participant is disconnected
+    success = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+        return liveInstance == null;
+      }
+    }, 3 * 1000);
+    Assert.assertTrue(success, "Live-instance should be gone after " + (n + 1) + " disconnects");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testControllerFlapping() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        32, // partitions per resource
+        1, // number of nodes
+        1, // replicas
+        "MasterSlave", false);
+
+    ClusterController controller = new ClusterController(clusterName, "controller", ZK_ADDR);
+    controller.syncStart();
+
+    final ZkClient client = controller.getManager().getZkClient();
+    final ZkStateCountListener listener = new ZkStateCountListener();
+    client.subscribeStateChanges(listener);
+
+    final AtomicInteger expectDisconnectCnt = new AtomicInteger(0);
+    final int n = ZKHelixManager.MAX_DISCONNECT_THRESHOLD;
+    for (int i = 0; i < n; i++) {
+      String oldSessionId = ZkTestHelper.getSessionId(client);
+      ZkTestHelper.simulateZkStateDisconnected(client);
+      expectDisconnectCnt.incrementAndGet();
+      // wait until we get invoked by zk state change to disconnected
+      TestHelper.verify(new Verifier() {
+
+        @Override
+        public boolean verify() throws Exception {
+          return listener.count == expectDisconnectCnt.get();
+        }
+      }, 30 * 1000);
+
+      String newSessionId = ZkTestHelper.getSessionId(client);
+      Assert.assertEquals(newSessionId, oldSessionId);
+    }
+
+    // make sure controller is NOT disconnected
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    Assert.assertNotNull(leader, "Leader should exist after " + n + " disconnects");
+
+    // trigger flapping
+    ZkTestHelper.simulateZkStateDisconnected(client);
+    // wait until we get invoked by zk state change to disconnected
+    boolean success = TestHelper.verify(new Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        return client.getShutdownTrigger();
+      }
+    }, 30 * 1000);
+
+    Assert.assertTrue(success, "The " + (n + 1)
+        + "th disconnect event should trigger ZkHelixManager#disonnect");
+
+    // make sure controller is disconnected
+    success = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+        return leader == null;
+      }
+    }, 5 * 1000);
+    Assert.assertTrue(success, "Leader should be gone after " + (n + 1) + " disconnects");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}