You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/05/02 21:53:46 UTC
git commit: add test cases for zk callback handler leaking
Updated Branches:
refs/heads/master cec747d68 -> d26addcf1
add test cases for zk callback handler leaking
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/d26addcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/d26addcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/d26addcf
Branch: refs/heads/master
Commit: d26addcf159f484391d26eddaa7b695168ec8227
Parents: cec747d
Author: zzhang <zz...@uci.edu>
Authored: Thu May 2 12:53:39 2013 -0700
Committer: zzhang <zz...@uci.edu>
Committed: Thu May 2 12:53:39 2013 -0700
----------------------------------------------------------------------
.../apache/helix/manager/zk/ZKHelixManager.java | 2 +-
.../test/java/org/apache/helix/TestZkBasis.java | 161 ++++++++++++++
.../test/java/org/apache/helix/ZkTestHelper.java | 51 ++++-
.../integration/TestZkCallbackHandlerLeak.java | 165 ++++++++++++++-
4 files changed, 368 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d26addcf/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 4095796..de93035 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -259,7 +259,7 @@ public class ZKHelixManager implements HelixManager
// compare property-key path and listener reference
if (handler.getPath().equals(propertyKey.getPath()) && handler.getListener().equals(listener))
{
- // TODO add log
+ logger.info("Listener: " + listener + " on path: " + propertyKey.getPath() + " already exists. skip adding it");
return;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d26addcf/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkBasis.java b/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
new file mode 100644
index 0000000..9d9c789
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
@@ -0,0 +1,161 @@
+package org.apache.helix;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * test zookeeper basis
+ */
+public class TestZkBasis extends ZkUnitTestBase {
+ class ZkListener implements IZkDataListener, IZkChildListener {
+ String _parentPath = null;
+ String _dataDeletePath = null;
+ List<String> _currentChilds = Collections.emptyList(); // make sure it's set to null in #handleChildChange()
+
+ CountDownLatch _childChangeCountDown = new CountDownLatch(1);
+ CountDownLatch _dataDeleteCountDown = new CountDownLatch(1);
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ _parentPath = parentPath;
+ _currentChilds = currentChilds;
+ _childChangeCountDown.countDown();
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ _dataDeletePath = dataPath;
+ _dataDeleteCountDown.countDown();
+ }
+ }
+ /**
+ * test zk watchers are renewed automatically after session expiry
+ *
+ * zookeeper-client side keeps all registered watchers see ZooKeeper.WatchRegistration.register()
+ * after session expiry, all watchers are renewed
+ * if a path that has watches on it has been removed during session expiry,
+ * the watchers on that path will still get callbacks after session renewal, especially:
+ * a data-watch will get data-deleted callback
+ * a child-watch will get a child-change callback with current-child-list = null
+ *
+ * this can be used for cleanup watchers on the zookeeper-client side
+ */
+ @Test
+ public void testWatchRenew() throws Exception {
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ final ZkClient client = new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ // make sure "/testName/test" doesn't exist
+ final String path = "/" + testName + "/test";
+ client.delete(path);
+
+ ZkListener listener = new ZkListener();
+ client.subscribeDataChanges(path, listener);
+ client.subscribeChildChanges(path, listener);
+
+ ZkTestHelper.expireSession(client);
+
+ boolean succeed = listener._childChangeCountDown.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(succeed, "fail to wait on child-change count-down in 10 seconds after session-expiry");
+ Assert.assertEquals(listener._parentPath, path, "fail to get child-change callback after session-expiry");
+ Assert.assertNull(listener._currentChilds, "fail to get child-change callback with currentChilds=null after session expiry");
+
+ succeed = listener._dataDeleteCountDown.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(succeed, "fail to wait on data-delete count-down in 10 seconds after session-expiry");
+ Assert.assertEquals(listener._dataDeletePath, path, "fail to get data-delete callback after session-expiry");
+
+ client.close();
+ }
+
+ /**
+ * after calling zkclient#unsubscribeXXXListener()
+ * an already registered watch will not be removed from ZooKeeper#watchManager#XXXWatches immediately.
+ * the watch will get removed on the following conditions:
+ * 1) there is a set/delete on the listening path via the zkclient
+ * 2) session expiry on the zkclient (i.e. the watch will not be renewed after session expiry)
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testWatchRemove() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ final ZkClient client = new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ // make sure "/testName/test" doesn't exist
+ final String path = "/" + testName + "/test";
+ client.createPersistent(path, true);
+
+ ZkListener listener = new ZkListener();
+ client.subscribeDataChanges(path, listener);
+ client.subscribeChildChanges(path, listener);
+
+ // listener should be in both ZkClient#_dataListener and ZkClient#_childListener set
+ Map<String, Set<IZkDataListener>> dataListenerMap = ZkTestHelper.getZkDataListener(client);
+ Assert.assertEquals(dataListenerMap.size(), 1, "ZkClient#_dataListener should have 1 listener");
+ Set<IZkDataListener> dataListenerSet = dataListenerMap.get(path);
+ Assert.assertNotNull(dataListenerSet, "ZkClient#_dataListener should have 1 listener on path: " + path);
+ Assert.assertEquals(dataListenerSet.size(), 1, "ZkClient#_dataListener should have 1 listener on path: " + path);
+
+
+ Map<String, Set<IZkChildListener>> childListenerMap = ZkTestHelper.getZkChildListener(client);
+ Assert.assertEquals(childListenerMap.size(), 1, "ZkClient#_childListener should have 1 listener");
+ Set<IZkChildListener> childListenerSet = childListenerMap.get(path);
+ Assert.assertNotNull(childListenerSet, "ZkClient#_childListener should have 1 listener on path: " + path);
+ Assert.assertEquals(childListenerSet.size(), 1, "ZkClient#_childListener should have 1 listener on path: " + path);
+
+ // watch should be in ZooKeeper#watchManager#XXXWatches
+ Map<String, List<String>> watchMap = ZkTestHelper.getZkWatch(client);
+ // System.out.println("watchMap1: " + watchMap);
+ List<String> dataWatch = watchMap.get("dataWatches");
+ Assert.assertNotNull(dataWatch, "ZooKeeper#watchManager#dataWatches should have 1 data watch on path: " + path);
+ Assert.assertEquals(dataWatch.size(), 1, "ZooKeeper#watchManager#dataWatches should have 1 data watch on path: " + path);
+ Assert.assertEquals(dataWatch.get(0), path, "ZooKeeper#watchManager#dataWatches should have 1 data watch on path: " + path);
+
+ List<String> childWatch = watchMap.get("childWatches");
+ Assert.assertNotNull(childWatch, "ZooKeeper#watchManager#childWatches should have 1 child watch on path: " + path);
+ Assert.assertEquals(childWatch.size(), 1, "ZooKeeper#watchManager#childWatches should have 1 child watch on path: " + path);
+ Assert.assertEquals(childWatch.get(0), path, "ZooKeeper#watchManager#childWatches should have 1 child watch on path: " + path);
+
+
+ client.unsubscribeDataChanges(path, listener);
+ client.unsubscribeChildChanges(path, listener);
+ // System.out.println("watchMap2: " + watchMap);
+ ZkTestHelper.expireSession(client);
+
+ // after session expiry, those watches should be removed
+ watchMap = ZkTestHelper.getZkWatch(client);
+ // System.out.println("watchMap3: " + watchMap);
+ dataWatch = watchMap.get("dataWatches");
+ Assert.assertTrue(dataWatch.isEmpty(), "ZooKeeper#watchManager#dataWatches should be empty");
+ childWatch = watchMap.get("childWatches");
+ Assert.assertTrue(childWatch.isEmpty(), "ZooKeeper#watchManager#childWatches should be empty");
+
+ client.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d26addcf/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index a5b5681..37b0664 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -23,14 +23,12 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.helix.InstanceType;
@@ -349,6 +347,49 @@ public class ZkTestHelper
}
}
+ public static Map<String, List<String>> getZkWatch(ZkClient client) throws Exception {
+ Map<String, List<String>> lists = new HashMap<String, List<String>>();
+ ZkConnection connection = ((ZkConnection) client.getConnection());
+ ZooKeeper zk = connection.getZookeeper();
+
+ java.lang.reflect.Field field = getField(zk.getClass(), "watchManager");
+ field.setAccessible(true);
+ Object watchManager = field.get(zk);
+
+ java.lang.reflect.Field field2 = getField(watchManager.getClass(), "dataWatches");
+ field2.setAccessible(true);
+ HashMap<String, Set<Watcher>> dataWatches = (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+ field2 = getField(watchManager.getClass(), "existWatches");
+ field2.setAccessible(true);
+ HashMap<String, Set<Watcher>> existWatches = (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+ field2 = getField(watchManager.getClass(), "childWatches");
+ field2.setAccessible(true);
+ HashMap<String, Set<Watcher>> childWatches = (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+ lists.put("dataWatches", new ArrayList<String>(dataWatches.keySet()));
+ lists.put("existWatches", new ArrayList<String>(existWatches.keySet()));
+ lists.put("childWatches", new ArrayList<String>(childWatches.keySet()));
+
+ return lists;
+ }
+
+ public static Map<String, Set<IZkDataListener>> getZkDataListener(ZkClient client) throws Exception {
+ java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
+ field.setAccessible(true);
+ Map<String, Set<IZkDataListener>> dataListener = (Map<String, Set<IZkDataListener>>)field.get(client);
+ return dataListener;
+ }
+
+ public static Map<String, Set<IZkChildListener>> getZkChildListener(ZkClient client) throws Exception {
+ java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
+ field.setAccessible(true);
+ Map<String, Set<IZkChildListener>> childListener = (Map<String, Set<IZkChildListener>>)field.get(client);
+ return childListener;
+ }
+
+
public static boolean tryWaitZkEventsCleaned(ZkClient zkclient) throws Exception {
java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
field.setAccessible(true);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d26addcf/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 6359434..6b0488f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -24,13 +24,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
-import org.apache.helix.ZkTestHelper;
-import org.apache.helix.ZkUnitTestBase;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.*;
import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.mock.controller.ClusterController;
import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.CurrentState;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -303,7 +304,134 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
+ new Date(System.currentTimeMillis()));
}
- // debug
+
+ @Test
+ public void testRemoveUserCbHandlerOnPathRemoval() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 3;
+ final String zkAddr = ZK_ADDR;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, zkAddr, 12918,
+ "localhost",
+ "TestDB",
+ 1, // resource
+ 32, // partitions
+ n, // nodes
+ 2, // replicas
+ "MasterSlave",
+ true);
+
+ ClusterController controller = new ClusterController(clusterName, "controller_0", zkAddr);
+ controller.syncStart();
+
+ MockParticipant[] participants = new MockParticipant[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipant(clusterName, instanceName, zkAddr, null);
+ participants[i].syncStart();
+
+ // register a controller listener on participant_0
+ if (i == 0) {
+ ZkHelixTestManager manager = participants[0].getManager();
+ manager.addCurrentStateChangeListener(new CurrentStateChangeListener() {
+ @Override
+ public void onStateChange(String instanceName, List<CurrentState> statesInfo, NotificationContext changeContext) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ System.out.println(instanceName + " on current-state change, type: " + changeContext.getType());
+ }
+ }, manager.getInstanceName(), manager.getSessionId());
+ }
+ }
+
+ Boolean result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+ clusterName));
+ Assert.assertTrue(result);
+
+ ZkHelixTestManager participantToExpire = participants[0].getManager();
+ String oldSessionId = participantToExpire.getSessionId();
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+
+ // check manager#hanlders
+ Assert.assertEquals(participantToExpire.getHandlers().size(), 3, "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES");
+
+ // check zkclient#listeners
+ Map<String, Set<IZkDataListener>> dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
+ Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
+ // printZkListeners(participantToExpire.getZkClient());
+ Assert.assertEquals(dataListeners.size(), 1, "Should have 1 path (CURRENTSTATE/{sessionId}/TestDB0) which has 1 data-listeners");
+ String path = keyBuilder.currentState(participantToExpire.getInstanceName(), oldSessionId, "TestDB0").getPath();
+ Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 data-listeners on path: " + path);
+ Assert.assertEquals(childListeners.size(), 3, "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener");
+ path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
+ Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
+ path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
+ Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
+ path = keyBuilder.controller().getPath();
+ Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
+
+ // check zookeeper#watches on client side
+ Map<String, List<String>> watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
+ // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
+ Assert.assertEquals(watchPaths.get("dataWatches").size(), 4, "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 3, "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}");
+
+
+ // expire localhost_12918
+ System.out.println("Expire participant: " + participantToExpire.getInstanceName() + ", session: " + participantToExpire.getSessionId());
+ ZkTestHelper.expireSession(participantToExpire.getZkClient());
+ String newSessionId = participantToExpire.getSessionId();
+ System.out.println(participantToExpire.getInstanceName() + " oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
+ result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // check manager#hanlders
+ Assert.assertEquals(participantToExpire.getHandlers().size(), 2, "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
+
+ // check zkclient#listeners
+ dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
+ childListeners = ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
+ // printZkListeners(participantToExpire.getZkClient());
+ Assert.assertTrue(dataListeners.isEmpty(), "Should have no data-listeners");
+ Assert.assertEquals(childListeners.size(), 3, "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). "
+ + "CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
+ path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
+ Assert.assertEquals(childListeners.get(path).size(), 0, "Should have no child-listener on path: " + path);
+ path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
+ Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
+ path = keyBuilder.controller().getPath();
+ Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
+
+ // check zookeeper#watches on client side
+ watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
+ // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
+ Assert.assertEquals(watchPaths.get("dataWatches").size(), 2, "Should have 2 data-watches: CONTROLLER and MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 2, "Should have 2 child-watches: CONTROLLER and MESSAGES");
+ Assert.assertEquals(watchPaths.get("existWatches").size(), 2, "Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
+
+ // another session expiry on localhost_12918 should clear the two exist-watches on CURRENTSTATE/{oldSessionId}
+ System.out.println("Expire participant: " + participantToExpire.getInstanceName() + ", session: " + participantToExpire.getSessionId());
+ ZkTestHelper.expireSession(participantToExpire.getZkClient());
+ result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // check zookeeper#watches on client side
+ watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
+ // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
+ Assert.assertEquals(watchPaths.get("dataWatches").size(), 2, "Should have 2 data-watches: CONTROLLER and MESSAGES");
+ Assert.assertEquals(watchPaths.get("childWatches").size(), 2, "Should have 2 child-watches: CONTROLLER and MESSAGES");
+ Assert.assertEquals(watchPaths.get("existWatches").size(), 0, "Should have no exist-watches. exist-watches on CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be cleared during handleNewSession");
+
+ // Thread.sleep(1000);
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ // debug code
static String printHandlers(ZkHelixTestManager manager)
{
StringBuilder sb = new StringBuilder();
@@ -322,4 +450,31 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
return sb.toString();
}
+
+ void printZkListeners(ZkClient client) throws Exception{
+ Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
+ Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
+
+ System.out.println("dataListeners {");
+ for (String path : datalisteners.keySet()) {
+ System.out.println("\t" + path + ": ");
+ Set<IZkDataListener> set = datalisteners.get(path);
+ for (IZkDataListener listener : set) {
+ CallbackHandler handler = (CallbackHandler)listener;
+ System.out.println("\t\t" + handler.getListener());
+ }
+ }
+ System.out.println("}");
+
+ System.out.println("childListeners {");
+ for (String path : childListeners.keySet()) {
+ System.out.println("\t" + path + ": ");
+ Set<IZkChildListener> set = childListeners.get(path);
+ for (IZkChildListener listener : set) {
+ CallbackHandler handler = (CallbackHandler)listener;
+ System.out.println("\t\t" + handler.getListener());
+ }
+ }
+ System.out.println("}");
+ }
}