You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/10/18 23:28:55 UTC
git commit: [HELIX-264] fix zkclient#close() bug,
port fix to 0.6.2 release branch, rb=14483
Updated Branches:
refs/heads/helix-0.6.2-release a48b89e5a -> 0c3796b30
[HELIX-264] fix zkclient#close() bug, port fix to 0.6.2 release branch, rb=14483
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/0c3796b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/0c3796b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/0c3796b3
Branch: refs/heads/helix-0.6.2-release
Commit: 0c3796b304c1b26f5887073295faee324f8411fa
Parents: a48b89e
Author: zzhang <zz...@apache.org>
Authored: Fri Oct 18 14:28:43 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri Oct 18 14:28:43 2013 -0700
----------------------------------------------------------------------
.../org/apache/helix/manager/zk/ZkClient.java | 41 ++-
.../java/org/apache/helix/ZkTestHelper.java | 48 +++-
.../apache/helix/manager/zk/TestZkFlapping.java | 272 +++++++++++++++++++
3 files changed, 348 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0c3796b3/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 5b3af6d..7d300a5 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -62,8 +62,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
_zkSerializer = zkSerializer;
if (LOG.isTraceEnabled()) {
StackTraceElement[] calls = Thread.currentThread().getStackTrace();
- int min = Math.min(calls.length, 5);
- LOG.trace("creating a zkclient. callstack: " + Arrays.asList(calls).subList(0, min));
+ LOG.trace("created a zkclient. callstack: " + Arrays.asList(calls));
}
}
@@ -122,10 +121,42 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
public void close() throws ZkInterruptedException {
if (LOG.isTraceEnabled()) {
StackTraceElement[] calls = Thread.currentThread().getStackTrace();
- int min = Math.min(calls.length, 5);
- LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls).subList(0, min));
+ LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls));
+ }
+ getEventLock().lock();
+ try {
+ if (_connection == null) {
+ return;
+ }
+ LOG.info("Closing zkclient: " + ((ZkConnection) _connection).getZookeeper());
+ super.close();
+ } catch (ZkInterruptedException e) {
+ /**
+ * Workaround for HELIX-264: calling ZkClient#close() in its own eventThread context will
+ * throw ZkInterruptedException and skip ZkConnection#close()
+ */
+ if (_connection != null) {
+ try {
+ /**
+ * ZkInterruptedException#construct() honors InterruptedException by calling
+ * Thread.currentThread().interrupt(); clear it first, so we can safely close the
+ * zk-connection
+ */
+ Thread.interrupted();
+ _connection.close();
+ _connection = null;
+ /**
+ * restore interrupted status of current thread
+ */
+ Thread.currentThread().interrupt();
+ } catch (InterruptedException e1) {
+ throw new ZkInterruptedException(e1);
+ }
+ }
+ } finally {
+ getEventLock().unlock();
+ LOG.info("Closed zkclient");
}
- super.close();
}
public Stat getStat(final String path) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0c3796b3/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index db2a6d0..b3f6f75 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -43,9 +43,11 @@ import org.apache.helix.model.ExternalView;
import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
+import org.testng.Assert;
public class ZkTestHelper {
private static Logger LOG = Logger.getLogger(ZkTestHelper.class);
@@ -54,6 +56,31 @@ public class ZkTestHelper {
// Logger.getRootLogger().setLevel(Level.DEBUG);
}
+ /**
+ * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
+ */
+ public static void simulateZkStateDisconnected(ZkClient client) {
+ WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
+ client.process(event);
+ }
+
+ /**
+ * Get zk connection session id
+ * @param client
+ * @return
+ */
+ public static String getSessionId(ZkClient client) {
+ ZkConnection connection = ((ZkConnection) client.getConnection());
+ ZooKeeper curZookeeper = connection.getZookeeper();
+ return Long.toHexString(curZookeeper.getSessionId());
+ }
+
+ /**
+ * Expire current zk session and wait for {@link IZkStateListener#handleNewSession()} invoked
+ * @param zkClient
+ * @throws Exception
+ */
+
public static void disconnectSession(final ZkClient zkClient) throws Exception {
IZkStateListener listener = new IZkStateListener() {
@Override
@@ -103,12 +130,12 @@ public class ZkTestHelper {
}
public static void expireSession(final ZkClient zkClient) throws Exception {
- final CountDownLatch waitExpire = new CountDownLatch(1);
+ final CountDownLatch waitNewSession = new CountDownLatch(1);
IZkStateListener listener = new IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
- // System.err.println("handleStateChanged. state: " + state);
+ LOG.info("IZkStateListener#handleStateChanged, state: " + state);
}
@Override
@@ -120,7 +147,7 @@ public class ZkTestHelper {
ZooKeeper curZookeeper = connection.getZookeeper();
LOG.info("handleNewSession. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
- waitExpire.countDown();
+ waitNewSession.countDown();
}
};
@@ -128,12 +155,13 @@ public class ZkTestHelper {
ZkConnection connection = ((ZkConnection) zkClient.getConnection());
ZooKeeper curZookeeper = connection.getZookeeper();
- LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+ String oldSessionId = Long.toHexString(curZookeeper.getSessionId());
+ LOG.info("Before session expiry. sessionId: " + oldSessionId + ", zk: " + curZookeeper);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
- LOG.info("Process watchEvent: " + event);
+ LOG.info("Watcher#process, event: " + event);
}
};
@@ -144,17 +172,21 @@ public class ZkTestHelper {
while (dupZookeeper.getState() != States.CONNECTED) {
Thread.sleep(10);
}
+ Assert.assertEquals(dupZookeeper.getState(), States.CONNECTED,
+ "Fail to connect to zk using current session info");
dupZookeeper.close();
// make sure session expiry really happens
- waitExpire.await();
+ waitNewSession.await();
zkClient.unsubscribeStateChanges(listener);
connection = (ZkConnection) zkClient.getConnection();
curZookeeper = connection.getZookeeper();
- // System.err.println("zk: " + oldZookeeper);
- LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+ String newSessionId = Long.toHexString(curZookeeper.getSessionId());
+ LOG.info("After session expiry. sessionId: " + newSessionId + ", zk: " + curZookeeper);
+ Assert.assertNotSame(newSessionId, oldSessionId, "Fail to expire current session, zk: "
+ + curZookeeper);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0c3796b3/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
new file mode 100644
index 0000000..b10fb96
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
@@ -0,0 +1,272 @@
+package org.apache.helix.manager.zk;
+
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.Verifier;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.LiveInstance;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkFlapping extends ZkUnitTestBase {
+
+ @Test
+ public void testZkSessionExpiry() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ ZkClient client =
+ new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+
+ String path = String.format("/%s", clusterName);
+ client.createEphemeral(path);
+ String oldSessionId = ZkTestHelper.getSessionId(client);
+ ZkTestHelper.expireSession(client);
+ String newSessionId = ZkTestHelper.getSessionId(client);
+ Assert.assertNotSame(newSessionId, oldSessionId);
+ Assert.assertFalse(client.exists(path), "Ephemeral znode should be gone after session expiry");
+ client.close();
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testCloseZkClient() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ ZkClient client =
+ new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ String path = String.format("/%s", clusterName);
+ client.createEphemeral(path);
+
+ client.close();
+ Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+ + " should be removed after ZkClient#close()");
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testCloseZkClientInZkClientEventThread() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ final CountDownLatch waitCallback = new CountDownLatch(1);
+ final ZkClient client =
+ new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ String path = String.format("/%s", clusterName);
+ client.createEphemeral(path);
+ client.subscribeDataChanges(path, new IZkDataListener() {
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ client.close();
+ waitCallback.countDown();
+ }
+ });
+
+ client.writeData(path, new ZNRecord("test"));
+ waitCallback.await();
+ Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+ + " should be removed after ZkClient#close() in its own event-thread");
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ }
+
+ class ZkStateCountListener implements IZkStateListener {
+ int count = 0;
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ if (state == KeeperState.Disconnected) {
+ count++;
+ }
+ }
+
+ @Override
+ public void handleNewSession() throws Exception {
+ }
+ }
+
+ @Test
+ public void testParticipantFlapping() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ 1, // number of nodes
+ 1, // replicas
+ "MasterSlave", false);
+
+ final String instanceName = "localhost_12918";
+ MockParticipant participant = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ participant.syncStart();
+
+ final ZkClient client = participant.getManager().getZkClient();
+ final ZkStateCountListener listener = new ZkStateCountListener();
+ client.subscribeStateChanges(listener);
+
+ final AtomicInteger expectDisconnectCnt = new AtomicInteger(0);
+ final int n = ZKHelixManager.MAX_DISCONNECT_THRESHOLD;
+ for (int i = 0; i < n; i++) {
+ String oldSessionId = ZkTestHelper.getSessionId(client);
+ ZkTestHelper.simulateZkStateDisconnected(client);
+ expectDisconnectCnt.incrementAndGet();
+ // wait until we get invoked by zk state change to disconnected
+ TestHelper.verify(new Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ return listener.count == expectDisconnectCnt.get();
+ }
+ }, 30 * 1000);
+
+ String newSessionId = ZkTestHelper.getSessionId(client);
+ Assert.assertEquals(newSessionId, oldSessionId);
+ }
+ client.unsubscribeStateChanges(listener);
+ // make sure participant is NOT disconnected
+ LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+ Assert.assertNotNull(liveInstance, "Live-instance should exist after " + n + " disconnects");
+
+ // trigger flapping
+ ZkTestHelper.simulateZkStateDisconnected(client);
+ // wait until we get invoked by zk state change to disconnected
+ boolean success = TestHelper.verify(new Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ return client.getShutdownTrigger();
+ }
+ }, 30 * 1000);
+
+ Assert.assertTrue(success, "The " + (n + 1)
+ + "th disconnect event should trigger ZkHelixManager#disonnect");
+
+ // make sure participant is disconnected
+ success = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+ return liveInstance == null;
+ }
+ }, 3 * 1000);
+ Assert.assertTrue(success, "Live-instance should be gone after " + (n + 1) + " disconnects");
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testControllerFlapping() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 32, // partitions per resource
+ 1, // number of nodes
+ 1, // replicas
+ "MasterSlave", false);
+
+ ClusterController controller = new ClusterController(clusterName, "controller", ZK_ADDR);
+ controller.syncStart();
+
+ final ZkClient client = controller.getManager().getZkClient();
+ final ZkStateCountListener listener = new ZkStateCountListener();
+ client.subscribeStateChanges(listener);
+
+ final AtomicInteger expectDisconnectCnt = new AtomicInteger(0);
+ final int n = ZKHelixManager.MAX_DISCONNECT_THRESHOLD;
+ for (int i = 0; i < n; i++) {
+ String oldSessionId = ZkTestHelper.getSessionId(client);
+ ZkTestHelper.simulateZkStateDisconnected(client);
+ expectDisconnectCnt.incrementAndGet();
+ // wait until we get invoked by zk state change to disconnected
+ TestHelper.verify(new Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ return listener.count == expectDisconnectCnt.get();
+ }
+ }, 30 * 1000);
+
+ String newSessionId = ZkTestHelper.getSessionId(client);
+ Assert.assertEquals(newSessionId, oldSessionId);
+ }
+
+ // make sure controller is NOT disconnected
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ Assert.assertNotNull(leader, "Leader should exist after " + n + " disconnects");
+
+ // trigger flapping
+ ZkTestHelper.simulateZkStateDisconnected(client);
+ // wait until we get invoked by zk state change to disconnected
+ boolean success = TestHelper.verify(new Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ return client.getShutdownTrigger();
+ }
+ }, 30 * 1000);
+
+ Assert.assertTrue(success, "The " + (n + 1)
+ + "th disconnect event should trigger ZkHelixManager#disonnect");
+
+ // make sure controller is disconnected
+ success = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ return leader == null;
+ }
+ }, 5 * 1000);
+ Assert.assertTrue(success, "Leader should be gone after " + (n + 1) + " disconnects");
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
\ No newline at end of file