You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/05/02 21:53:46 UTC

git commit: add test cases for zk callback handler leaking

Updated Branches:
  refs/heads/master cec747d68 -> d26addcf1


add test cases for zk callback handler leaking


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/d26addcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/d26addcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/d26addcf

Branch: refs/heads/master
Commit: d26addcf159f484391d26eddaa7b695168ec8227
Parents: cec747d
Author: zzhang <zz...@uci.edu>
Authored: Thu May 2 12:53:39 2013 -0700
Committer: zzhang <zz...@uci.edu>
Committed: Thu May 2 12:53:39 2013 -0700

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java    |    2 +-
 .../test/java/org/apache/helix/TestZkBasis.java    |  161 ++++++++++++++
 .../test/java/org/apache/helix/ZkTestHelper.java   |   51 ++++-
 .../integration/TestZkCallbackHandlerLeak.java     |  165 ++++++++++++++-
 4 files changed, 368 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d26addcf/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 4095796..de93035 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -259,7 +259,7 @@ public class ZKHelixManager implements HelixManager
         // compare property-key path and listener reference
         if (handler.getPath().equals(propertyKey.getPath()) && handler.getListener().equals(listener))
         {
-          // TODO add log
+          logger.info("Listener: " + listener + " on path: " + propertyKey.getPath() + " already exists. skip adding it");
           return;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d26addcf/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkBasis.java b/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
new file mode 100644
index 0000000..9d9c789
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
@@ -0,0 +1,161 @@
+package org.apache.helix;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * test zookeeper basis
+ */
+public class TestZkBasis extends ZkUnitTestBase {
+    class ZkListener implements  IZkDataListener, IZkChildListener {
+        String _parentPath = null;
+        String _dataDeletePath = null;
+        List<String> _currentChilds = Collections.emptyList();  // make sure it's set to null in #handleChildChange()
+
+        CountDownLatch _childChangeCountDown = new CountDownLatch(1);
+        CountDownLatch _dataDeleteCountDown = new CountDownLatch(1);
+
+        @Override
+        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+            _parentPath = parentPath;
+            _currentChilds = currentChilds;
+            _childChangeCountDown.countDown();
+        }
+
+        @Override
+        public void handleDataChange(String dataPath, Object data) throws Exception {
+            //To change body of implemented methods use File | Settings | File Templates.
+        }
+
+        @Override
+        public void handleDataDeleted(String dataPath) throws Exception {
+            _dataDeletePath = dataPath;
+            _dataDeleteCountDown.countDown();
+        }
+    }
+    /**
+     * test zk watchers are renewed automatically after session expiry
+     *
+     * zookeeper-client side keeps all registered watchers see ZooKeeper.WatchRegistration.register()
+     * after session expiry, all watchers are renewed
+     * if a path that has watches on it has been removed during session expiry,
+     * the watchers on that path will still get callbacks after session renewal, especially:
+     *  a data-watch will get data-deleted callback
+     *  a child-watch will get a child-change callback with current-child-list = null
+     *
+     * this can be used for cleanup watchers on the zookeeper-client side
+     */
+    @Test
+    public void testWatchRenew() throws Exception {
+
+        String className = TestHelper.getTestClassName();
+        String methodName = TestHelper.getTestMethodName();
+        String testName = className + "_" + methodName;
+
+        final ZkClient client = new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+                ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+        // make sure "/testName/test" doesn't exist
+        final String path = "/" + testName + "/test";
+        client.delete(path);
+
+        ZkListener listener = new ZkListener();
+        client.subscribeDataChanges(path, listener);
+        client.subscribeChildChanges(path, listener);
+
+        ZkTestHelper.expireSession(client);
+
+        boolean succeed = listener._childChangeCountDown.await(10, TimeUnit.SECONDS);
+        Assert.assertTrue(succeed, "fail to wait on child-change count-down in 10 seconds after session-expiry");
+        Assert.assertEquals(listener._parentPath, path, "fail to get child-change callback after session-expiry");
+        Assert.assertNull(listener._currentChilds, "fail to get child-change callback with currentChilds=null after session expiry");
+
+        succeed = listener._dataDeleteCountDown.await(10, TimeUnit.SECONDS);
+        Assert.assertTrue(succeed, "fail to wait on data-delete count-down in 10 seconds after session-expiry");
+        Assert.assertEquals(listener._dataDeletePath, path, "fail to get data-delete callback after session-expiry");
+
+        client.close();
+    }
+
+    /**
+     * after calling zkclient#unsubscribeXXXListener()
+     * an already registered watch will not be removed from ZooKeeper#watchManager#XXXWatches immediately.
+     * the watch will get removed on the following conditions:
+     *  1) there is a set/delete on the listening path via the zkclient
+     *  2) session expiry on the zkclient (i.e. the watch will not be renewed after session expiry)
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testWatchRemove() throws Exception {
+        String className = TestHelper.getTestClassName();
+        String methodName = TestHelper.getTestMethodName();
+        String testName = className + "_" + methodName;
+
+        final ZkClient client = new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+                ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+        // make sure "/testName/test" doesn't exist
+        final String path = "/" + testName + "/test";
+        client.createPersistent(path, true);
+
+        ZkListener listener = new ZkListener();
+        client.subscribeDataChanges(path, listener);
+        client.subscribeChildChanges(path, listener);
+
+        // listener should be in both ZkClient#_dataListener and ZkClient#_childListener set
+        Map<String, Set<IZkDataListener>> dataListenerMap = ZkTestHelper.getZkDataListener(client);
+        Assert.assertEquals(dataListenerMap.size(), 1, "ZkClient#_dataListener should have 1 listener");
+        Set<IZkDataListener> dataListenerSet = dataListenerMap.get(path);
+        Assert.assertNotNull(dataListenerSet, "ZkClient#_dataListener should have 1 listener on path: " + path);
+        Assert.assertEquals(dataListenerSet.size(), 1, "ZkClient#_dataListener should have 1 listener on path: " + path);
+
+
+        Map<String, Set<IZkChildListener>> childListenerMap = ZkTestHelper.getZkChildListener(client);
+        Assert.assertEquals(childListenerMap.size(), 1, "ZkClient#_childListener should have 1 listener");
+        Set<IZkChildListener> childListenerSet = childListenerMap.get(path);
+        Assert.assertNotNull(childListenerSet, "ZkClient#_childListener should have 1 listener on path: " + path);
+        Assert.assertEquals(childListenerSet.size(), 1, "ZkClient#_childListener should have 1 listener on path: " + path);
+
+        // watch should be in ZooKeeper#watchManager#XXXWatches
+        Map<String, List<String>> watchMap = ZkTestHelper.getZkWatch(client);
+        // System.out.println("watchMap1: " + watchMap);
+        List<String> dataWatch = watchMap.get("dataWatches");
+        Assert.assertNotNull(dataWatch, "ZooKeeper#watchManager#dataWatches should have 1 data watch on path: " + path);
+        Assert.assertEquals(dataWatch.size(), 1, "ZooKeeper#watchManager#dataWatches should have 1 data watch on path: " + path);
+        Assert.assertEquals(dataWatch.get(0), path, "ZooKeeper#watchManager#dataWatches should have 1 data watch on path: " + path);
+
+        List<String> childWatch = watchMap.get("childWatches");
+        Assert.assertNotNull(childWatch, "ZooKeeper#watchManager#childWatches should have 1 child watch on path: " + path);
+        Assert.assertEquals(childWatch.size(), 1, "ZooKeeper#watchManager#childWatches should have 1 child watch on path: " + path);
+        Assert.assertEquals(childWatch.get(0), path, "ZooKeeper#watchManager#childWatches should have 1 child watch on path: " + path);
+
+
+        client.unsubscribeDataChanges(path, listener);
+        client.unsubscribeChildChanges(path, listener);
+        // System.out.println("watchMap2: " + watchMap);
+        ZkTestHelper.expireSession(client);
+
+        // after session expiry, those watches should be removed
+        watchMap = ZkTestHelper.getZkWatch(client);
+        // System.out.println("watchMap3: " + watchMap);
+        dataWatch = watchMap.get("dataWatches");
+        Assert.assertTrue(dataWatch.isEmpty(), "ZooKeeper#watchManager#dataWatches should be empty");
+        childWatch = watchMap.get("childWatches");
+        Assert.assertTrue(childWatch.isEmpty(), "ZooKeeper#watchManager#childWatches should be empty");
+
+        client.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d26addcf/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index a5b5681..37b0664 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -23,14 +23,12 @@ import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.net.Socket;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.helix.InstanceType;
@@ -349,6 +347,49 @@ public class ZkTestHelper
         }
     }
 
+    public static Map<String, List<String>> getZkWatch(ZkClient client) throws Exception {
+        Map<String, List<String>> lists = new HashMap<String, List<String>>();
+        ZkConnection connection = ((ZkConnection) client.getConnection());
+        ZooKeeper zk = connection.getZookeeper();
+
+        java.lang.reflect.Field field = getField(zk.getClass(), "watchManager");
+        field.setAccessible(true);
+        Object watchManager = field.get(zk);
+
+        java.lang.reflect.Field field2 = getField(watchManager.getClass(), "dataWatches");
+        field2.setAccessible(true);
+        HashMap<String, Set<Watcher>> dataWatches = (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+        field2 = getField(watchManager.getClass(), "existWatches");
+        field2.setAccessible(true);
+        HashMap<String, Set<Watcher>> existWatches = (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+        field2 = getField(watchManager.getClass(), "childWatches");
+        field2.setAccessible(true);
+        HashMap<String, Set<Watcher>> childWatches = (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+        lists.put("dataWatches", new ArrayList<String>(dataWatches.keySet()));
+        lists.put("existWatches", new ArrayList<String>(existWatches.keySet()));
+        lists.put("childWatches", new ArrayList<String>(childWatches.keySet()));
+
+        return lists;
+    }
+
+    public static Map<String, Set<IZkDataListener>> getZkDataListener(ZkClient client) throws Exception {
+        java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
+        field.setAccessible(true);
+        Map<String, Set<IZkDataListener>> dataListener = (Map<String, Set<IZkDataListener>>)field.get(client);
+        return dataListener;
+    }
+
+    public static Map<String, Set<IZkChildListener>> getZkChildListener(ZkClient client) throws Exception {
+        java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
+        field.setAccessible(true);
+        Map<String, Set<IZkChildListener>> childListener = (Map<String, Set<IZkChildListener>>)field.get(client);
+        return childListener;
+    }
+
+
     public static boolean tryWaitZkEventsCleaned(ZkClient zkclient) throws Exception {
         java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
         field.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d26addcf/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 6359434..6b0488f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -24,13 +24,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
-import org.apache.helix.ZkTestHelper;
-import org.apache.helix.ZkUnitTestBase;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.*;
 import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.controller.ClusterController;
 import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -303,7 +304,134 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
 	            + new Date(System.currentTimeMillis()));
 	}
 
-	// debug
+
+    @Test
+    public void testRemoveUserCbHandlerOnPathRemoval() throws  Exception {
+        String className = TestHelper.getTestClassName();
+        String methodName = TestHelper.getTestMethodName();
+        String clusterName = className + "_" + methodName;
+        final int n = 3;
+        final String zkAddr = ZK_ADDR;
+        System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+        TestHelper.setupCluster(clusterName, zkAddr, 12918,
+                "localhost",
+                "TestDB",
+                1,  // resource
+                32, // partitions
+                n,  // nodes
+                2,  // replicas
+                "MasterSlave",
+                true);
+
+        ClusterController controller = new ClusterController(clusterName, "controller_0", zkAddr);
+        controller.syncStart();
+
+        MockParticipant[] participants = new MockParticipant[n];
+        for (int i = 0; i < n; i++) {
+            String instanceName = "localhost_" + (12918 + i);
+            participants[i] = new MockParticipant(clusterName, instanceName, zkAddr, null);
+            participants[i].syncStart();
+
+            // register a controller listener on participant_0
+            if (i == 0) {
+                ZkHelixTestManager manager = participants[0].getManager();
+                manager.addCurrentStateChangeListener(new CurrentStateChangeListener() {
+                    @Override
+                    public void onStateChange(String instanceName, List<CurrentState> statesInfo, NotificationContext changeContext) {
+                        //To change body of implemented methods use File | Settings | File Templates.
+                        System.out.println(instanceName + " on current-state change, type: " + changeContext.getType());
+                    }
+                }, manager.getInstanceName(), manager.getSessionId());
+            }
+        }
+
+        Boolean result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+        Assert.assertTrue(result);
+
+        ZkHelixTestManager participantToExpire = participants[0].getManager();
+        String oldSessionId = participantToExpire.getSessionId();
+        PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+
+
+        // check manager#hanlders
+        Assert.assertEquals(participantToExpire.getHandlers().size(), 3, "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES");
+
+        // check zkclient#listeners
+        Map<String, Set<IZkDataListener>> dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
+        Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
+        // printZkListeners(participantToExpire.getZkClient());
+        Assert.assertEquals(dataListeners.size(), 1, "Should have 1 path (CURRENTSTATE/{sessionId}/TestDB0) which has 1 data-listeners");
+        String path = keyBuilder.currentState(participantToExpire.getInstanceName(), oldSessionId, "TestDB0").getPath();
+        Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 data-listeners on path: " + path);
+        Assert.assertEquals(childListeners.size(), 3, "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener");
+        path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
+        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
+        path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
+        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
+        path = keyBuilder.controller().getPath();
+        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
+
+        // check zookeeper#watches on client side
+        Map<String, List<String>> watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
+        // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
+        Assert.assertEquals(watchPaths.get("dataWatches").size(), 4, "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES");
+        Assert.assertEquals(watchPaths.get("childWatches").size(), 3, "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}");
+
+
+        // expire localhost_12918
+        System.out.println("Expire participant: " + participantToExpire.getInstanceName() + ", session: " + participantToExpire.getSessionId());
+        ZkTestHelper.expireSession(participantToExpire.getZkClient());
+        String newSessionId = participantToExpire.getSessionId();
+        System.out.println(participantToExpire.getInstanceName() + " oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
+        result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+        Assert.assertTrue(result);
+
+        // check manager#hanlders
+        Assert.assertEquals(participantToExpire.getHandlers().size(), 2, "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()");
+
+        // check zkclient#listeners
+        dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient());
+        childListeners = ZkTestHelper.getZkChildListener(participantToExpire.getZkClient());
+        // printZkListeners(participantToExpire.getZkClient());
+        Assert.assertTrue(dataListeners.isEmpty(), "Should have no data-listeners");
+        Assert.assertEquals(childListeners.size(), 3, "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). "
+                + "CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())");
+        path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
+        Assert.assertEquals(childListeners.get(path).size(), 0, "Should have no child-listener on path: " + path);
+        path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath();
+        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
+        path = keyBuilder.controller().getPath();
+        Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path);
+
+        // check zookeeper#watches on client side
+        watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
+        // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
+        Assert.assertEquals(watchPaths.get("dataWatches").size(), 2, "Should have 2 data-watches: CONTROLLER and MESSAGES");
+        Assert.assertEquals(watchPaths.get("childWatches").size(), 2, "Should have 2 child-watches: CONTROLLER and MESSAGES");
+        Assert.assertEquals(watchPaths.get("existWatches").size(), 2, "Should have 2 exist-watches: CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0");
+
+        // another session expiry on localhost_12918 should clear the two exist-watches on CURRENTSTATE/{oldSessionId}
+        System.out.println("Expire participant: " + participantToExpire.getInstanceName() + ", session: " + participantToExpire.getSessionId());
+        ZkTestHelper.expireSession(participantToExpire.getZkClient());
+        result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr,
+                clusterName));
+        Assert.assertTrue(result);
+
+        // check zookeeper#watches on client side
+        watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
+        // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n");
+        Assert.assertEquals(watchPaths.get("dataWatches").size(), 2, "Should have 2 data-watches: CONTROLLER and MESSAGES");
+        Assert.assertEquals(watchPaths.get("childWatches").size(), 2, "Should have 2 child-watches: CONTROLLER and MESSAGES");
+        Assert.assertEquals(watchPaths.get("existWatches").size(), 0, "Should have no exist-watches. exist-watches on CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be cleared during handleNewSession");
+
+        // Thread.sleep(1000);
+        System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    }
+
+	// debug code
 	static String printHandlers(ZkHelixTestManager manager) 
 	{
 		StringBuilder sb = new StringBuilder();
@@ -322,4 +450,31 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
 	    
 	    return sb.toString();
     }
+
+    void printZkListeners(ZkClient client) throws  Exception{
+        Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
+        Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
+
+        System.out.println("dataListeners {");
+        for (String path : datalisteners.keySet()) {
+            System.out.println("\t" + path + ": ");
+            Set<IZkDataListener> set = datalisteners.get(path);
+            for (IZkDataListener listener : set) {
+                CallbackHandler handler = (CallbackHandler)listener;
+                System.out.println("\t\t" + handler.getListener());
+            }
+        }
+        System.out.println("}");
+
+        System.out.println("childListeners {");
+        for (String path : childListeners.keySet()) {
+            System.out.println("\t" + path + ": ");
+            Set<IZkChildListener> set = childListeners.get(path);
+            for (IZkChildListener listener : set) {
+                CallbackHandler handler = (CallbackHandler)listener;
+                System.out.println("\t\t" + handler.getListener());
+            }
+        }
+        System.out.println("}");
+    }
 }