You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/11/16 01:29:50 UTC

git commit: [HELIX-264] fix zkclient#close() bug, port fix to 0.6.1.5-release

Updated Branches:
  refs/heads/helix-0.6.1.5-release fa7597e2d -> 58a7887e5


[HELIX-264] fix zkclient#close() bug, port fix to 0.6.1.5-release


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/58a7887e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/58a7887e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/58a7887e

Branch: refs/heads/helix-0.6.1.5-release
Commit: 58a7887e5aac4b6c7da55526a5559cf976e2b6ae
Parents: fa7597e
Author: zzhang <zz...@apache.org>
Authored: Fri Oct 4 11:20:17 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri Nov 15 14:44:07 2013 -0800

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java |  77 +++---
 .../org/apache/helix/manager/zk/ZkClient.java   |  44 ++-
 .../java/org/apache/helix/ZkTestHelper.java     |  58 ++--
 .../TestExceptionInStateModelReset.java         | 145 ++++++++++
 .../apache/helix/manager/zk/TestZkFlapping.java | 272 +++++++++++++++++++
 5 files changed, 537 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/58a7887e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index eec38aa..4e72769 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -429,11 +429,6 @@ public class ZKHelixManager implements HelixManager
   @Override
   public void disconnect()
   {
-    if (!isConnected())
-    {
-      logger.error("ClusterManager " + _instanceName + " already disconnected");
-      return;
-    }
     disconnectInternal();
   }
   
@@ -444,45 +439,51 @@ public class ZKHelixManager implements HelixManager
     logger.info("disconnect " + _instanceName + "(" + _instanceType + ") from "
         + _clusterName);
 
-    /**
-     * shutdown thread pool first to avoid reset() being invoked in the middle of state
-     * transition
-     */
-    _messagingService.getExecutor().shutdown();
-    resetHandlers();
-
-    _helixAccessor.shutdown();
-
-    if (_leaderElectionHandler != null)
-    {
-      _leaderElectionHandler.reset();
-    }
-
-    if (_participantHealthCheckInfoCollector != null)
+    try 
     {
-      _participantHealthCheckInfoCollector.stop();
+      /**
+       * shutdown thread pool first to avoid reset() being invoked in the middle of state
+       * transition
+       */
+      _messagingService.getExecutor().shutdown();
+      resetHandlers();
+  
+      _helixAccessor.shutdown();
+  
+      if (_leaderElectionHandler != null)
+      {
+        _leaderElectionHandler.reset();
+      }
+  
+      if (_participantHealthCheckInfoCollector != null)
+      {
+        _participantHealthCheckInfoCollector.stop();
+      }
+  
+      if (_timer != null)
+      {
+        _timer.cancel();
+        _timer = null;
+      }
+  
+      if (_instanceType == InstanceType.CONTROLLER)
+      {
+        stopTimerTasks();
+      }
     }
-
-    if (_timer != null)
+    catch (Exception e)
     {
-      _timer.cancel();
-      _timer = null;
+      logger.error("Exception in disconnecting " + _instanceName + "(" + _instanceType + ") from "
+        + _clusterName, e);
     }
-
-    if (_instanceType == InstanceType.CONTROLLER)
+    finally
     {
-      stopTimerTasks();
-    }
-
-    // unsubscribe accessor from controllerChange
-    _zkClient.unsubscribeAll();
-
-    _zkClient.close();
-
-    // HACK seems that zkClient is not sending DISCONNECT event
-    _zkStateChangeListener.disconnect();
-    logger.info("Cluster manager: " + _instanceName + " disconnected");
+      _zkClient.close();
   
+      // HACK seems that zkClient is not sending DISCONNECT event
+      _zkStateChangeListener.disconnect();
+      logger.info("Cluster manager: " + _instanceName + " disconnected");
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/58a7887e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 41fde0c..d9823af 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -67,8 +67,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient
     if(LOG.isTraceEnabled())
     {
       StackTraceElement[] calls = Thread.currentThread().getStackTrace();
-      int min = Math.min(calls.length, 5);
-      LOG.trace("creating a zkclient. callstack: " + Arrays.asList(calls).subList(0, min));
+      LOG.trace("creating a zkclient. callstack: " + Arrays.asList(calls));
     }
   }
 
@@ -142,10 +141,45 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient
     if(LOG.isTraceEnabled())
     {
       StackTraceElement[] calls = Thread.currentThread().getStackTrace();
-      int min = Math.min(calls.length, 5);
-      LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls).subList(0, min));
+      LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls));
+    }
+
+    getEventLock().lock();
+    try {
+      if (_connection == null) {
+        return;
+      }
+
+      LOG.info("Closing zkclient: " + ((ZkConnection) _connection).getZookeeper());
+      super.close();
+    } catch (ZkInterruptedException e) {
+      /**
+       * HELIX-264: calling ZkClient#close() in its own eventThread context will
+       * throw ZkInterruptedException and skip ZkConnection#close()
+       */
+      if (_connection != null) {
+        try {
+          /**
+           * ZkInterruptedException#construct() honors InterruptedException by calling
+           * Thread.currentThread().interrupt(); clear it first, so we can safely close the
+           * zk-connection
+           */
+          Thread.interrupted();
+          _connection.close();
+          _connection = null;
+
+          /**
+           * restore interrupted status of current thread
+           */
+          Thread.currentThread().interrupt();
+        } catch (InterruptedException e1) {
+          throw new ZkInterruptedException(e1);
+        }
+      }
+    } finally {
+      getEventLock().unlock();
+      LOG.info("Closed zkclient");
     }
-    super.close();
   }
 
   public Stat getStat(final String path)

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/58a7887e/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index 37b0664..89e4b83 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -20,6 +20,7 @@ package org.apache.helix;
  */
 
 import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.net.Socket;
@@ -43,9 +44,11 @@ import org.apache.helix.model.ExternalView;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
+import org.testng.Assert;
 
 
 public class ZkTestHelper
@@ -116,16 +119,38 @@ public class ZkTestHelper
     LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
   }
 
-  public static void expireSession(final ZkClient zkClient) throws Exception
-  {
-    final CountDownLatch waitExpire = new CountDownLatch(1);
+  /**
+   * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
+   */
+  public static void simulateZkStateDisconnected(ZkClient client) {
+    WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
+    client.process(event);
+  }
+
+  /**
+   * Get zk connection session id
+   * @param client
+   * @return
+   */
+  public static String getSessionId(ZkClient client) {
+    ZkConnection connection = ((ZkConnection) client.getConnection());
+    ZooKeeper curZookeeper = connection.getZookeeper();
+    return Long.toHexString(curZookeeper.getSessionId());
+  }
+
+  /**
+   * Expire current zk session and wait for {@link IZkStateListener#handleNewSession()} invoked
+   * @param zkClient
+   * @throws Exception
+   */
+  public static void expireSession(final ZkClient zkClient) throws Exception {
+    final CountDownLatch waitNewSession = new CountDownLatch(1);
 
     IZkStateListener listener = new IZkStateListener()
     {
       @Override
-      public void handleStateChanged(KeeperState state) throws Exception
-      {
-//         System.err.println("handleStateChanged. state: " + state);
+      public void handleStateChanged(KeeperState state) throws Exception {
+        LOG.info("IZkStateListener#handleStateChanged, state: " + state);
       }
 
       @Override
@@ -137,9 +162,8 @@ public class ZkTestHelper
         ZkConnection connection = ((ZkConnection) zkClient.getConnection());
         ZooKeeper curZookeeper = connection.getZookeeper();
 
-        LOG.info("handleNewSession. sessionId: "
-            + Long.toHexString(curZookeeper.getSessionId()));
-        waitExpire.countDown();
+        LOG.info("handleNewSession. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+        waitNewSession.countDown();
       }
     };
 
@@ -147,14 +171,14 @@ public class ZkTestHelper
 
     ZkConnection connection = ((ZkConnection) zkClient.getConnection());
     ZooKeeper curZookeeper = connection.getZookeeper();
-    LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+    String oldSessionId = Long.toHexString(curZookeeper.getSessionId());
+    LOG.info("Before session expiry. sessionId: " + oldSessionId + ", zk: " + curZookeeper);
 
     Watcher watcher = new Watcher()
     {
       @Override
-      public void process(WatchedEvent event)
-      {
-        LOG.info("Process watchEvent: " + event);
+      public void process(WatchedEvent event) {
+        LOG.info("Watcher#process, event: " + event);
       }
     };
 
@@ -169,17 +193,19 @@ public class ZkTestHelper
     {
       Thread.sleep(10);
     }
+    Assert.assertEquals(dupZookeeper.getState(), States.CONNECTED, "Fail to connect to zk using current session info");
     dupZookeeper.close();
 
     // make sure session expiry really happens
-    waitExpire.await();
+    waitNewSession.await();
     zkClient.unsubscribeStateChanges(listener);
 
     connection = (ZkConnection) zkClient.getConnection();
     curZookeeper = connection.getZookeeper();
 
-    // System.err.println("zk: " + oldZookeeper);
-    LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+    String newSessionId = Long.toHexString(curZookeeper.getSessionId());
+    LOG.info("After session expiry. sessionId: " + newSessionId + ", zk: " + curZookeeper);
+    Assert.assertNotSame(newSessionId, oldSessionId, "Fail to expire current session, zk: " + curZookeeper);
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/58a7887e/helix-core/src/test/java/org/apache/helix/integration/TestExceptionInStateModelReset.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestExceptionInStateModelReset.java b/helix-core/src/test/java/org/apache/helix/integration/TestExceptionInStateModelReset.java
new file mode 100644
index 0000000..a5512c1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestExceptionInStateModelReset.java
@@ -0,0 +1,145 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestExceptionInStateModelReset extends ZkUnitTestBase
+{
+  private static Logger LOG = Logger.getLogger(TestExceptionInStateModelReset.class);
+
+  @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" })
+  public static class TestMSStateModel extends StateModel 
+  {
+    @Transition(to = "SLAVE", from = "OFFLINE")
+    public void onBecomeSlaveFromOffline(Message message, NotificationContext context) throws InterruptedException
+    {
+      LOG.info("Become SLAVE from OFFLINE");
+    }
+
+    @Transition(to = "MASTER", from = "SLAVE")
+    public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException
+    {
+      LOG.info("Become MASTER from SLAVE");
+    }
+
+    @Transition(to = "SLAVE", from = "MASTER")
+    public void onBecomeSlaveFromMaster(Message message, NotificationContext context) throws InterruptedException
+    {
+      LOG.info("Become SLAVE from MASTER");
+    }
+
+    @Transition(to = "OFFLINE", from = "SLAVE")
+    public void onBecomeOfflineFromSlave(Message message, NotificationContext context) throws InterruptedException
+    {
+      LOG.info("Become OFFLINE from SLAVE");
+    }
+    
+    public void reset()
+    {
+      LOG.warn("Reset");
+      throw new RuntimeException("IGNORABLE: test throwing exception from StateModel#reset()");
+    }
+  }
+  
+  public static class TestMSStateModelFactory extends StateModelFactory<TestMSStateModel> 
+  {
+
+    @Override
+    public TestMSStateModel createNewStateModel(String partitionName)
+    {
+      TestMSStateModel model = new TestMSStateModel();
+      return model;
+    }
+    
+  }
+  
+  @Test
+  public void test() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            32, // partitions per resource
+                            n, // number of nodes
+                            2, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+    
+    
+    ClusterController controller =
+        new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller.syncStart();
+
+    // start participants
+    MockParticipant[] participants = new MockParticipant[n];
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+
+      if (i == 0)
+      {
+        StateModelFactory factory = new TestMSStateModelFactory();
+        participants[i] = new MockParticipant(factory, clusterName, instanceName, ZK_ADDR, null);
+      }
+      else
+      {
+        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      }
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // stop participant[0], will throw runtime exception in reset()
+    participants[0].syncStop();
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    LiveInstance liveInsance = accessor.getProperty(keyBuilder.liveInstance(participants[0].getInstanceName()));
+    Assert.assertNull(liveInsance);
+    
+    // clean up
+    // wait for all zk callbacks done
+    Thread.sleep(1000);
+    controller.syncStop();
+    for (int i = 1; i < n; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/58a7887e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
new file mode 100644
index 0000000..5b35148
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
@@ -0,0 +1,272 @@
+package org.apache.helix.manager.zk;
+
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.Verifier;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.LiveInstance;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkFlapping extends ZkUnitTestBase {
+
+  @Test
+  public void testZkSessionExpiry() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ZkClient client =
+        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+
+    String path = String.format("/%s", clusterName);
+    client.createEphemeral(path);
+    String oldSessionId = ZkTestHelper.getSessionId(client);
+    ZkTestHelper.expireSession(client);
+    String newSessionId = ZkTestHelper.getSessionId(client);
+    Assert.assertNotSame(newSessionId, oldSessionId);
+    Assert.assertFalse(client.exists(path), "Ephemeral znode should be gone after session expiry");
+    client.close();
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testCloseZkClient() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ZkClient client =
+        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+    String path = String.format("/%s", clusterName);
+    client.createEphemeral(path);
+
+    client.close();
+    Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+        + " should be removed after ZkClient#close()");
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testCloseZkClientInZkClientEventThread() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    final CountDownLatch waitCallback = new CountDownLatch(1);
+    final ZkClient client =
+        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+    String path = String.format("/%s", clusterName);
+    client.createEphemeral(path);
+    client.subscribeDataChanges(path, new IZkDataListener() {
+
+      @Override
+      public void handleDataDeleted(String dataPath) throws Exception {
+      }
+
+      @Override
+      public void handleDataChange(String dataPath, Object data) throws Exception {
+        client.close();
+        waitCallback.countDown();
+      }
+    });
+
+    client.writeData(path, new ZNRecord("test"));
+    waitCallback.await();
+    Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+        + " should be removed after ZkClient#close() in its own event-thread");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+
+  class ZkStateCountListener implements IZkStateListener {
+    int count = 0;
+
+    @Override
+    public void handleStateChanged(KeeperState state) throws Exception {
+      if (state == KeeperState.Disconnected) {
+        count++;
+      }
+    }
+
+    @Override
+    public void handleNewSession() throws Exception {
+    }
+  }
+
+  @Test
+  public void testParticipantFlapping() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        32, // partitions per resource
+        1, // number of nodes
+        1, // replicas
+        "MasterSlave", false);
+
+    final String instanceName = "localhost_12918";
+    MockParticipant participant = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+    participant.syncStart();
+
+    final ZkClient client = participant.getManager().getZkClient();
+    final ZkStateCountListener listener = new ZkStateCountListener();
+    client.subscribeStateChanges(listener);
+
+    final AtomicInteger expectDisconnectCnt = new AtomicInteger(0);
+    final int n = ZKHelixManager.MAX_DISCONNECT_THRESHOLD;
+    for (int i = 0; i < n; i++) {
+      String oldSessionId = ZkTestHelper.getSessionId(client);
+      ZkTestHelper.simulateZkStateDisconnected(client);
+      expectDisconnectCnt.incrementAndGet();
+      // wait until we get invoked by zk state change to disconnected
+      TestHelper.verify(new Verifier() {
+
+        @Override
+        public boolean verify() throws Exception {
+          return listener.count == expectDisconnectCnt.get();
+        }
+      }, 30 * 1000);
+
+      String newSessionId = ZkTestHelper.getSessionId(client);
+      Assert.assertEquals(newSessionId, oldSessionId);
+    }
+    client.unsubscribeStateChanges(listener);
+    // make sure participant is NOT disconnected
+    LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+    Assert.assertNotNull(liveInstance, "Live-instance should exist after " + n + " disconnects");
+
+    // trigger flapping
+    ZkTestHelper.simulateZkStateDisconnected(client);
+    // wait until we get invoked by zk state change to disconnected
+    boolean success = TestHelper.verify(new Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        return client.getShutdownTrigger();
+      }
+    }, 30 * 1000);
+
+    Assert.assertTrue(success, "The " + (n + 1)
+        + "th disconnect event should trigger ZkHelixManager#disonnect");
+
+    // make sure participant is disconnected
+    success = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+        return liveInstance == null;
+      }
+    }, 3 * 1000);
+    Assert.assertTrue(success, "Live-instance should be gone after " + (n + 1) + " disconnects");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testControllerFlapping() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        32, // partitions per resource
+        1, // number of nodes
+        1, // replicas
+        "MasterSlave", false);
+
+    ClusterController controller = new ClusterController(clusterName, "controller", ZK_ADDR);
+    controller.syncStart();
+
+    final ZkClient client = controller.getManager().getZkClient();
+    final ZkStateCountListener listener = new ZkStateCountListener();
+    client.subscribeStateChanges(listener);
+
+    final AtomicInteger expectDisconnectCnt = new AtomicInteger(0);
+    final int n = ZKHelixManager.MAX_DISCONNECT_THRESHOLD;
+    for (int i = 0; i < n; i++) {
+      String oldSessionId = ZkTestHelper.getSessionId(client);
+      ZkTestHelper.simulateZkStateDisconnected(client);
+      expectDisconnectCnt.incrementAndGet();
+      // wait until we get invoked by zk state change to disconnected
+      TestHelper.verify(new Verifier() {
+
+        @Override
+        public boolean verify() throws Exception {
+          return listener.count == expectDisconnectCnt.get();
+        }
+      }, 30 * 1000);
+
+      String newSessionId = ZkTestHelper.getSessionId(client);
+      Assert.assertEquals(newSessionId, oldSessionId);
+    }
+
+    // make sure controller is NOT disconnected
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    Assert.assertNotNull(leader, "Leader should exist after " + n + " disconnects");
+
+    // trigger flapping
+    ZkTestHelper.simulateZkStateDisconnected(client);
+    // wait until we get invoked by zk state change to disconnected
+    boolean success = TestHelper.verify(new Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        return client.getShutdownTrigger();
+      }
+    }, 30 * 1000);
+
+    Assert.assertTrue(success, "The " + (n + 1)
+        + "th disconnect event should trigger ZkHelixManager#disonnect");
+
+    // make sure controller is disconnected
+    success = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+        return leader == null;
+      }
+    }, 5 * 1000);
+    Assert.assertTrue(success, "Leader should be gone after " + (n + 1) + " disconnects");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}