You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/11/16 01:29:50 UTC
git commit: [HELIX-264] fix zkclient#close() bug,
port fix to 0.6.1.5-release
Updated Branches:
refs/heads/helix-0.6.1.5-release fa7597e2d -> 58a7887e5
[HELIX-264] fix zkclient#close() bug, port fix to 0.6.1.5-release
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/58a7887e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/58a7887e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/58a7887e
Branch: refs/heads/helix-0.6.1.5-release
Commit: 58a7887e5aac4b6c7da55526a5559cf976e2b6ae
Parents: fa7597e
Author: zzhang <zz...@apache.org>
Authored: Fri Oct 4 11:20:17 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri Nov 15 14:44:07 2013 -0800
----------------------------------------------------------------------
.../apache/helix/manager/zk/ZKHelixManager.java | 77 +++---
.../org/apache/helix/manager/zk/ZkClient.java | 44 ++-
.../java/org/apache/helix/ZkTestHelper.java | 58 ++--
.../TestExceptionInStateModelReset.java | 145 ++++++++++
.../apache/helix/manager/zk/TestZkFlapping.java | 272 +++++++++++++++++++
5 files changed, 537 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/58a7887e/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index eec38aa..4e72769 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -429,11 +429,6 @@ public class ZKHelixManager implements HelixManager
@Override
public void disconnect()
{
- if (!isConnected())
- {
- logger.error("ClusterManager " + _instanceName + " already disconnected");
- return;
- }
disconnectInternal();
}
@@ -444,45 +439,51 @@ public class ZKHelixManager implements HelixManager
logger.info("disconnect " + _instanceName + "(" + _instanceType + ") from "
+ _clusterName);
- /**
- * shutdown thread pool first to avoid reset() being invoked in the middle of state
- * transition
- */
- _messagingService.getExecutor().shutdown();
- resetHandlers();
-
- _helixAccessor.shutdown();
-
- if (_leaderElectionHandler != null)
- {
- _leaderElectionHandler.reset();
- }
-
- if (_participantHealthCheckInfoCollector != null)
+ try
{
- _participantHealthCheckInfoCollector.stop();
+ /**
+ * shutdown thread pool first to avoid reset() being invoked in the middle of state
+ * transition
+ */
+ _messagingService.getExecutor().shutdown();
+ resetHandlers();
+
+ _helixAccessor.shutdown();
+
+ if (_leaderElectionHandler != null)
+ {
+ _leaderElectionHandler.reset();
+ }
+
+ if (_participantHealthCheckInfoCollector != null)
+ {
+ _participantHealthCheckInfoCollector.stop();
+ }
+
+ if (_timer != null)
+ {
+ _timer.cancel();
+ _timer = null;
+ }
+
+ if (_instanceType == InstanceType.CONTROLLER)
+ {
+ stopTimerTasks();
+ }
}
-
- if (_timer != null)
+ catch (Exception e)
{
- _timer.cancel();
- _timer = null;
+ logger.error("Exception in disconnecting " + _instanceName + "(" + _instanceType + ") from "
+ + _clusterName, e);
}
-
- if (_instanceType == InstanceType.CONTROLLER)
+ finally
{
- stopTimerTasks();
- }
-
- // unsubscribe accessor from controllerChange
- _zkClient.unsubscribeAll();
-
- _zkClient.close();
-
- // HACK seems that zkClient is not sending DISCONNECT event
- _zkStateChangeListener.disconnect();
- logger.info("Cluster manager: " + _instanceName + " disconnected");
+ _zkClient.close();
+ // HACK seems that zkClient is not sending DISCONNECT event
+ _zkStateChangeListener.disconnect();
+ logger.info("Cluster manager: " + _instanceName + " disconnected");
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/58a7887e/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 41fde0c..d9823af 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -67,8 +67,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient
if(LOG.isTraceEnabled())
{
StackTraceElement[] calls = Thread.currentThread().getStackTrace();
- int min = Math.min(calls.length, 5);
- LOG.trace("creating a zkclient. callstack: " + Arrays.asList(calls).subList(0, min));
+ LOG.trace("creating a zkclient. callstack: " + Arrays.asList(calls));
}
}
@@ -142,10 +141,45 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient
if(LOG.isTraceEnabled())
{
StackTraceElement[] calls = Thread.currentThread().getStackTrace();
- int min = Math.min(calls.length, 5);
- LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls).subList(0, min));
+ LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls));
+ }
+
+ getEventLock().lock();
+ try {
+ if (_connection == null) {
+ return;
+ }
+
+ LOG.info("Closing zkclient: " + ((ZkConnection) _connection).getZookeeper());
+ super.close();
+ } catch (ZkInterruptedException e) {
+ /**
+ * HELIX-264: calling ZkClient#close() in its own eventThread context will
+ * throw ZkInterruptedException and skip ZkConnection#close()
+ */
+ if (_connection != null) {
+ try {
+ /**
+ * ZkInterruptedException#construct() honors InterruptedException by calling
+ * Thread.currentThread().interrupt(); clear it first, so we can safely close the
+ * zk-connection
+ */
+ Thread.interrupted();
+ _connection.close();
+ _connection = null;
+
+ /**
+ * restore interrupted status of current thread
+ */
+ Thread.currentThread().interrupt();
+ } catch (InterruptedException e1) {
+ throw new ZkInterruptedException(e1);
+ }
+ }
+ } finally {
+ getEventLock().unlock();
+ LOG.info("Closed zkclient");
}
- super.close();
}
public Stat getStat(final String path)
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/58a7887e/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index 37b0664..89e4b83 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -20,6 +20,7 @@ package org.apache.helix;
*/
import java.io.BufferedReader;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
@@ -43,9 +44,11 @@ import org.apache.helix.model.ExternalView;
import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
+import org.testng.Assert;
public class ZkTestHelper
@@ -116,16 +119,38 @@ public class ZkTestHelper
LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
}
- public static void expireSession(final ZkClient zkClient) throws Exception
- {
- final CountDownLatch waitExpire = new CountDownLatch(1);
+ /**
+ * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
+ */
+ public static void simulateZkStateDisconnected(ZkClient client) {
+ WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
+ client.process(event);
+ }
+
+ /**
+ * Get zk connection session id
+ * @param client
+ * @return
+ */
+ public static String getSessionId(ZkClient client) {
+ ZkConnection connection = ((ZkConnection) client.getConnection());
+ ZooKeeper curZookeeper = connection.getZookeeper();
+ return Long.toHexString(curZookeeper.getSessionId());
+ }
+
+ /**
+ * Expire current zk session and wait for {@link IZkStateListener#handleNewSession()} invoked
+ * @param zkClient
+ * @throws Exception
+ */
+ public static void expireSession(final ZkClient zkClient) throws Exception {
+ final CountDownLatch waitNewSession = new CountDownLatch(1);
IZkStateListener listener = new IZkStateListener()
{
@Override
- public void handleStateChanged(KeeperState state) throws Exception
- {
-// System.err.println("handleStateChanged. state: " + state);
+ public void handleStateChanged(KeeperState state) throws Exception {
+ LOG.info("IZkStateListener#handleStateChanged, state: " + state);
}
@Override
@@ -137,9 +162,8 @@ public class ZkTestHelper
ZkConnection connection = ((ZkConnection) zkClient.getConnection());
ZooKeeper curZookeeper = connection.getZookeeper();
- LOG.info("handleNewSession. sessionId: "
- + Long.toHexString(curZookeeper.getSessionId()));
- waitExpire.countDown();
+ LOG.info("handleNewSession. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+ waitNewSession.countDown();
}
};
@@ -147,14 +171,14 @@ public class ZkTestHelper
ZkConnection connection = ((ZkConnection) zkClient.getConnection());
ZooKeeper curZookeeper = connection.getZookeeper();
- LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+ String oldSessionId = Long.toHexString(curZookeeper.getSessionId());
+ LOG.info("Before session expiry. sessionId: " + oldSessionId + ", zk: " + curZookeeper);
Watcher watcher = new Watcher()
{
@Override
- public void process(WatchedEvent event)
- {
- LOG.info("Process watchEvent: " + event);
+ public void process(WatchedEvent event) {
+ LOG.info("Watcher#process, event: " + event);
}
};
@@ -169,17 +193,19 @@ public class ZkTestHelper
{
Thread.sleep(10);
}
+ Assert.assertEquals(dupZookeeper.getState(), States.CONNECTED, "Fail to connect to zk using current session info");
dupZookeeper.close();
// make sure session expiry really happens
- waitExpire.await();
+ waitNewSession.await();
zkClient.unsubscribeStateChanges(listener);
connection = (ZkConnection) zkClient.getConnection();
curZookeeper = connection.getZookeeper();
- // System.err.println("zk: " + oldZookeeper);
- LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+ String newSessionId = Long.toHexString(curZookeeper.getSessionId());
+ LOG.info("After session expiry. sessionId: " + newSessionId + ", zk: " + curZookeeper);
+ Assert.assertNotSame(newSessionId, oldSessionId, "Fail to expire current session, zk: " + curZookeeper);
}
/*
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/58a7887e/helix-core/src/test/java/org/apache/helix/integration/TestExceptionInStateModelReset.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestExceptionInStateModelReset.java b/helix-core/src/test/java/org/apache/helix/integration/TestExceptionInStateModelReset.java
new file mode 100644
index 0000000..a5512c1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestExceptionInStateModelReset.java
@@ -0,0 +1,145 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestExceptionInStateModelReset extends ZkUnitTestBase
+{
+ private static Logger LOG = Logger.getLogger(TestExceptionInStateModelReset.class);
+
+ @StateModelInfo(initialState = "OFFLINE", states = { "MASTER", "SLAVE", "ERROR" })
+ public static class TestMSStateModel extends StateModel
+ {
+ @Transition(to = "SLAVE", from = "OFFLINE")
+ public void onBecomeSlaveFromOffline(Message message, NotificationContext context) throws InterruptedException
+ {
+ LOG.info("Become SLAVE from OFFLINE");
+ }
+
+ @Transition(to = "MASTER", from = "SLAVE")
+ public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException
+ {
+ LOG.info("Become MASTER from SLAVE");
+ }
+
+ @Transition(to = "SLAVE", from = "MASTER")
+ public void onBecomeSlaveFromMaster(Message message, NotificationContext context) throws InterruptedException
+ {
+ LOG.info("Become SLAVE from MASTER");
+ }
+
+ @Transition(to = "OFFLINE", from = "SLAVE")
+ public void onBecomeOfflineFromSlave(Message message, NotificationContext context) throws InterruptedException
+ {
+ LOG.info("Become OFFLINE from SLAVE");
+ }
+
+ public void reset()
+ {
+ LOG.warn("Reset");
+ throw new RuntimeException("IGNORABLE: test throwing exception from StateModel#reset()");
+ }
+ }
+
+ public static class TestMSStateModelFactory extends StateModelFactory<TestMSStateModel>
+ {
+
+ @Override
+ public TestMSStateModel createNewStateModel(String partitionName)
+ {
+ TestMSStateModel model = new TestMSStateModel();
+ return model;
+ }
+
+ }
+
+ @Test
+ public void test() throws Exception
+ {
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+
+ ClusterController controller =
+ new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ controller.syncStart();
+
+ // start participants
+ MockParticipant[] participants = new MockParticipant[n];
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+
+ if (i == 0)
+ {
+ StateModelFactory factory = new TestMSStateModelFactory();
+ participants[i] = new MockParticipant(factory, clusterName, instanceName, ZK_ADDR, null);
+ }
+ else
+ {
+ participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ }
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // stop participant[0], will throw runtime exception in reset()
+ participants[0].syncStop();
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ LiveInstance liveInsance = accessor.getProperty(keyBuilder.liveInstance(participants[0].getInstanceName()));
+ Assert.assertNull(liveInsance);
+
+ // clean up
+ // wait for all zk callbacks done
+ Thread.sleep(1000);
+ controller.syncStop();
+ for (int i = 1; i < n; i++)
+ {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/58a7887e/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
new file mode 100644
index 0000000..5b35148
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
@@ -0,0 +1,272 @@
+package org.apache.helix.manager.zk;
+
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.Verifier;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.LiveInstance;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkFlapping extends ZkUnitTestBase {
+
+ @Test
+ public void testZkSessionExpiry() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ ZkClient client =
+ new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+
+ String path = String.format("/%s", clusterName);
+ client.createEphemeral(path);
+ String oldSessionId = ZkTestHelper.getSessionId(client);
+ ZkTestHelper.expireSession(client);
+ String newSessionId = ZkTestHelper.getSessionId(client);
+ Assert.assertNotSame(newSessionId, oldSessionId);
+ Assert.assertFalse(client.exists(path), "Ephemeral znode should be gone after session expiry");
+ client.close();
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testCloseZkClient() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ ZkClient client =
+ new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ String path = String.format("/%s", clusterName);
+ client.createEphemeral(path);
+
+ client.close();
+ Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+ + " should be removed after ZkClient#close()");
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testCloseZkClientInZkClientEventThread() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ final CountDownLatch waitCallback = new CountDownLatch(1);
+ final ZkClient client =
+ new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ String path = String.format("/%s", clusterName);
+ client.createEphemeral(path);
+ client.subscribeDataChanges(path, new IZkDataListener() {
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ client.close();
+ waitCallback.countDown();
+ }
+ });
+
+ client.writeData(path, new ZNRecord("test"));
+ waitCallback.await();
+ Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+ + " should be removed after ZkClient#close() in its own event-thread");
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ }
+
+ class ZkStateCountListener implements IZkStateListener {
+ int count = 0;
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ if (state == KeeperState.Disconnected) {
+ count++;
+ }
+ }
+
+ @Override
+ public void handleNewSession() throws Exception {
+ }
+ }
+
+ @Test
+ public void testParticipantFlapping() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ 1, // number of nodes
+ 1, // replicas
+ "MasterSlave", false);
+
+ final String instanceName = "localhost_12918";
+ MockParticipant participant = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participant.syncStart();
+
+ final ZkClient client = participant.getManager().getZkClient();
+ final ZkStateCountListener listener = new ZkStateCountListener();
+ client.subscribeStateChanges(listener);
+
+ final AtomicInteger expectDisconnectCnt = new AtomicInteger(0);
+ final int n = ZKHelixManager.MAX_DISCONNECT_THRESHOLD;
+ for (int i = 0; i < n; i++) {
+ String oldSessionId = ZkTestHelper.getSessionId(client);
+ ZkTestHelper.simulateZkStateDisconnected(client);
+ expectDisconnectCnt.incrementAndGet();
+ // wait until we get invoked by zk state change to disconnected
+ TestHelper.verify(new Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ return listener.count == expectDisconnectCnt.get();
+ }
+ }, 30 * 1000);
+
+ String newSessionId = ZkTestHelper.getSessionId(client);
+ Assert.assertEquals(newSessionId, oldSessionId);
+ }
+ client.unsubscribeStateChanges(listener);
+ // make sure participant is NOT disconnected
+ LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+ Assert.assertNotNull(liveInstance, "Live-instance should exist after " + n + " disconnects");
+
+ // trigger flapping
+ ZkTestHelper.simulateZkStateDisconnected(client);
+ // wait until we get invoked by zk state change to disconnected
+ boolean success = TestHelper.verify(new Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ return client.getShutdownTrigger();
+ }
+ }, 30 * 1000);
+
+ Assert.assertTrue(success, "The " + (n + 1)
+ + "th disconnect event should trigger ZkHelixManager#disonnect");
+
+ // make sure participant is disconnected
+ success = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+ return liveInstance == null;
+ }
+ }, 3 * 1000);
+ Assert.assertTrue(success, "Live-instance should be gone after " + (n + 1) + " disconnects");
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testControllerFlapping() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ 1, // number of nodes
+ 1, // replicas
+ "MasterSlave", false);
+
+ ClusterController controller = new ClusterController(clusterName, "controller", ZK_ADDR);
+ controller.syncStart();
+
+ final ZkClient client = controller.getManager().getZkClient();
+ final ZkStateCountListener listener = new ZkStateCountListener();
+ client.subscribeStateChanges(listener);
+
+ final AtomicInteger expectDisconnectCnt = new AtomicInteger(0);
+ final int n = ZKHelixManager.MAX_DISCONNECT_THRESHOLD;
+ for (int i = 0; i < n; i++) {
+ String oldSessionId = ZkTestHelper.getSessionId(client);
+ ZkTestHelper.simulateZkStateDisconnected(client);
+ expectDisconnectCnt.incrementAndGet();
+ // wait until we get invoked by zk state change to disconnected
+ TestHelper.verify(new Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ return listener.count == expectDisconnectCnt.get();
+ }
+ }, 30 * 1000);
+
+ String newSessionId = ZkTestHelper.getSessionId(client);
+ Assert.assertEquals(newSessionId, oldSessionId);
+ }
+
+ // make sure controller is NOT disconnected
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ Assert.assertNotNull(leader, "Leader should exist after " + n + " disconnects");
+
+ // trigger flapping
+ ZkTestHelper.simulateZkStateDisconnected(client);
+ // wait until we get invoked by zk state change to disconnected
+ boolean success = TestHelper.verify(new Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ return client.getShutdownTrigger();
+ }
+ }, 30 * 1000);
+
+ Assert.assertTrue(success, "The " + (n + 1)
+ + "th disconnect event should trigger ZkHelixManager#disonnect");
+
+ // make sure controller is disconnected
+ success = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ return leader == null;
+ }
+ }, 5 * 1000);
+ Assert.assertTrue(success, "Leader should be gone after " + (n + 1) + " disconnects");
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}