You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/07/04 07:59:28 UTC

[helix] branch master updated: Cleanup test code and adding finalize method to avoid ZkClient leakage in the Helix-core tests. (#1138)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new bacb137  Cleanup test code and adding finalize method to avoid ZkClient leakage in the Helix-core tests. (#1138)
bacb137 is described below

commit bacb137f6fa8182cd063cbfd52305a32edc8f92c
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Sat Jul 4 00:59:19 2020 -0700

    Cleanup test code and adding finalize method to avoid ZkClient leakage in the Helix-core tests. (#1138)
    
    The ZkClient leakage is currently blocking the whole test suite to finish. This change cleans up some leaking tests and adding the finalize method to the critical ZkClient resource holders. Since we have enforced GC between test classes, these finalize methods will help to release a major part of the leakage.
    
    Note that this fix still cannot eliminate all the undesired leakages.
---
 .../main/java/org/apache/helix/ConfigAccessor.java |   5 +
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |   5 +
 .../helix/manager/zk/ZkBaseDataAccessor.java       |   5 +
 .../helix/manager/zk/ZkBucketDataAccessor.java     |   6 +
 .../helix/manager/zk/ZkCacheBaseDataAccessor.java  |   5 +
 .../java/org/apache/helix/tools/ClusterSetup.java  |   5 +
 .../apache/helix/tools/ClusterStateVerifier.java   |  22 ++-
 .../BestPossibleExternalViewVerifier.java          |   5 +
 .../ClusterVerifiers/ClusterLiveNodesVerifier.java |   5 +
 .../StrictMatchExternalViewVerifier.java           |   5 +
 .../ClusterVerifiers/ZkHelixClusterVerifier.java   |   5 +
 .../src/test/java/org/apache/helix/TestHelper.java |  39 ++---
 .../org/apache/helix/TestListenerCallback.java     |   3 +
 .../org/apache/helix/TestPropertyPathBuilder.java  |   1 -
 .../test/java/org/apache/helix/TestZKCallback.java | 183 +++++++++++----------
 .../waged/TestAssignmentMetadataStore.java         |   3 +
 .../helix/integration/TestBasicSpectator.java      |  26 +--
 .../TestCorrectnessOnConnectivityLoss.java         |  54 +++---
 .../integration/TestPartitionMovementThrottle.java |   1 +
 .../integration/common/ZkStandAloneCMTestBase.java |   3 +
 .../manager/ClusterControllerManager.java          |  13 +-
 .../manager/ClusterDistributedController.java      |  12 +-
 .../helix/integration/manager/ClusterManager.java  |  36 +++-
 .../manager/ClusterSpectatorManager.java           |  24 +--
 .../manager/MockParticipantManager.java            |   7 +-
 .../TestCrossClusterMessagingService.java          |   6 +-
 .../messaging/TestP2PNoDuplicatedMessage.java      |   5 +
 .../multizk/TestMultiZkHelixJavaApis.java          |   1 +
 .../paticipant/TestCustomizedStateUpdate.java      |  19 +--
 .../TestDelayedAutoRebalance.java                  |   3 +
 .../rebalancer/TestMixedModeAutoRebalance.java     |   5 +
 .../TestRoutingTableProviderFromCurrentStates.java |   1 +
 .../helix/integration/task/TestBatchAddJobs.java   |  12 +-
 .../integration/task/TestForceDeleteWorkflow.java  |   8 -
 .../task/TestJobQueueDeleteIdealState.java         |   5 -
 .../integration/task/TestStopAndResumeQueue.java   |  11 +-
 .../task/TestStoppingQueueFailToStop.java          |   5 -
 .../task/TestTaskSchedulingTwoCurrentStates.java   |   5 -
 .../helix/integration/task/TestTaskStopQueue.java  |  21 +--
 .../mock/TestMockMetadataStoreDirectoryServer.java |   5 +-
 40 files changed, 323 insertions(+), 267 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 34b72c3..db8c3e1 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -993,6 +993,11 @@ public class ConfigAccessor {
     }
   }
 
+  @Override
+  public void finalize() {
+    close();
+  }
+
   public static class Builder extends GenericZkHelixApiBuilder<Builder> {
     public Builder() {
     }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index e5a5e5c..4d89b5b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -1800,6 +1800,11 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
+  public void finalize() {
+    close();
+  }
+
+  @Override
   public boolean addResourceWithWeight(String clusterName, IdealState idealState,
       ResourceConfig resourceConfig) {
     // Null checks
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 182dc2a..9ff6554 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -1300,6 +1300,11 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     }
   }
 
+  @Override
+  public void finalize() {
+    close();
+  }
+
   public static class Builder<T> extends GenericBaseDataAccessorBuilder<Builder<T>> {
     public Builder() {
     }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 7f2e748..74eab30 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -330,6 +330,12 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
     disconnect();
   }
 
+  @Override
+  public void finalize() {
+    _zkBaseDataAccessor.close();
+    close();
+  }
+
   private synchronized void updateGCTimer(String rootPath, long currentVersion) {
     if (_gcTaskFuture != null) {
       _gcTaskFuture.cancel(false);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 78cc9dc..2afb51e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -836,6 +836,11 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
     }
   }
 
+  @Override
+  public void finalize() {
+    close();
+  }
+
   public static class Builder<T> extends GenericBaseDataAccessorBuilder<Builder<T>> {
     /** ZkCacheBaseDataAccessor-specific parameters */
     private String _chrootPath;
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 166a7a8..3e96ca0 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -205,6 +205,11 @@ public class ClusterSetup {
     }
   }
 
+  @Override
+  public void finalize() {
+    close();
+  }
+
   public void addCluster(String clusterName, boolean overwritePrevious, CloudConfig cloudConfig)
       throws HelixException {
     _admin.addCluster(clusterName, overwritePrevious);
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index 5d8d1a0..3faa0e7 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -149,6 +149,7 @@ public class ClusterStateVerifier {
         .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
   }
 
+  @Deprecated
   public static class BestPossAndExtViewZkVerifier implements ZkVerifier {
     private final String clusterName;
     private final Map<String, Map<String, String>> errStates;
@@ -169,7 +170,7 @@ public class ClusterStateVerifier {
       this(validateAndGetClient(zkAddr, clusterName), clusterName, errStates, resources);
     }
 
-    public BestPossAndExtViewZkVerifier(HelixZkClient zkClient, String clusterName,
+    private BestPossAndExtViewZkVerifier(HelixZkClient zkClient, String clusterName,
         Map<String, Map<String, String>> errStates, Set<String> resources) {
       if (zkClient == null || clusterName == null) {
         throw new IllegalArgumentException("requires zkClient|clusterName");
@@ -426,9 +427,16 @@ public class ClusterStateVerifier {
       verifierName = verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
       return verifierName + "(" + clusterName + "@" + zkClient.getServers() + ")";
     }
-  }
 
+    @Override
+    public void finalize() {
+      if (zkClient != null) {
+        zkClient.close();
+      }
+    }
+  }
 
+  @Deprecated
   public static class MasterNbInExtViewVerifier implements ZkVerifier {
     private final String clusterName;
     private final HelixZkClient zkClient;
@@ -437,7 +445,7 @@ public class ClusterStateVerifier {
       this(validateAndGetClient(zkAddr, clusterName), clusterName);
     }
 
-    public MasterNbInExtViewVerifier(HelixZkClient zkClient, String clusterName) {
+    private MasterNbInExtViewVerifier(HelixZkClient zkClient, String clusterName) {
       if (zkClient == null || clusterName == null) {
         throw new IllegalArgumentException("requires zkClient|clusterName");
       }
@@ -511,6 +519,13 @@ public class ClusterStateVerifier {
       }
       return true;
     }
+
+    @Override
+    public void finalize() {
+      if (zkClient != null) {
+        zkClient.close();
+      }
+    }
   }
 
   public static boolean verifyByPolling(Verifier verifier) {
@@ -743,5 +758,4 @@ public class ClusterStateVerifier {
     System.out.println(result ? "Successful" : "failed");
     System.exit(1);
   }
-
 }
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 7ea9a05..cb3a040 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -433,6 +433,11 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
        + (_resources != null ? Arrays.toString(_resources.toArray()) : "") + "])";
   }
 
+  @Override
+  public void finalize() {
+    close();
+  }
+
   private class DryrunWagedRebalancer extends org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer {
     public DryrunWagedRebalancer(String metadataStoreAddress, String clusterName,
         Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
index 4a46f18..8c95728 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
@@ -59,6 +59,11 @@ public class ClusterLiveNodesVerifier extends ZkHelixClusterVerifier {
     return _expectLiveNodes.equals(actualLiveNodes);
   }
 
+  @Override
+  public void finalize() {
+    close();
+  }
+
   public static class Builder extends ZkHelixClusterVerifier.Builder<Builder> {
     private final String _clusterName; // This is the ZK path sharding key
     private final Set<String> _expectLiveNodes;
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index 7341012..c2eec49 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -340,4 +340,9 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
         .format("%s(%s@%s@resources[%s])", verifierName, _clusterName, _zkClient.getServers(),
             _resources != null ? Arrays.toString(_resources.toArray()) : "");
   }
+
+  @Override
+  public void finalize() {
+    close();
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index e2022e7..a71184f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -384,4 +384,9 @@ public abstract class ZkHelixClusterVerifier
       }
     }
   }
+
+  @Override
+  public void finalize() {
+    close();
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 18d8aae..19d13f0 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -268,32 +268,33 @@ public class TestHelper {
 
   public static void setupCluster(String clusterName, String zkAddr, int startPort,
       String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb,
-      int nodesNb, int replica, String stateModelDef, RebalanceMode mode, boolean doRebalance)
-      throws Exception {
+      int nodesNb, int replica, String stateModelDef, RebalanceMode mode, boolean doRebalance) {
     HelixZkClient zkClient = SharedZkClientFactory.getInstance()
         .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-    if (zkClient.exists("/" + clusterName)) {
-      LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
-      zkClient.deleteRecursively("/" + clusterName);
-    }
+    try {
+      if (zkClient.exists("/" + clusterName)) {
+        LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
+        zkClient.deleteRecursively("/" + clusterName);
+      }
 
-    ClusterSetup setupTool = new ClusterSetup(zkAddr);
-    setupTool.addCluster(clusterName, true);
+      ClusterSetup setupTool = new ClusterSetup(zkAddr);
+      setupTool.addCluster(clusterName, true);
 
-    for (int i = 0; i < nodesNb; i++) {
-      int port = startPort + i;
-      setupTool.addInstanceToCluster(clusterName, participantNamePrefix + "_" + port);
-    }
+      for (int i = 0; i < nodesNb; i++) {
+        int port = startPort + i;
+        setupTool.addInstanceToCluster(clusterName, participantNamePrefix + "_" + port);
+      }
 
-    for (int i = 0; i < resourceNb; i++) {
-      String resourceName = resourceNamePrefix + i;
-      setupTool.addResourceToCluster(clusterName, resourceName, partitionNb, stateModelDef,
-          mode.toString());
-      if (doRebalance) {
-        setupTool.rebalanceStorageCluster(clusterName, resourceName, replica);
+      for (int i = 0; i < resourceNb; i++) {
+        String resourceName = resourceNamePrefix + i;
+        setupTool.addResourceToCluster(clusterName, resourceName, partitionNb, stateModelDef, mode.toString());
+        if (doRebalance) {
+          setupTool.rebalanceStorageCluster(clusterName, resourceName, replica);
+        }
       }
+    } finally {
+      zkClient.close();
     }
-    zkClient.close();
   }
 
   public static void dropCluster(String clusterName, RealmAwareZkClient zkClient) {
diff --git a/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java b/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
index d53aed8..87ed70d 100644
--- a/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
+++ b/helix-core/src/test/java/org/apache/helix/TestListenerCallback.java
@@ -119,6 +119,9 @@ public class TestListenerCallback extends ZkUnitTestBase {
 
   @AfterClass
   public void afterClass() throws Exception {
+    if (_manager != null && _manager.isConnected()) {
+      _manager.disconnect();
+    }
     TestHelper.dropCluster(_clusterName, _gZkClient);
     System.out.println("END " + _clusterName + " at " + new Date(System.currentTimeMillis()));
   }
diff --git a/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java b/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
index d414d54..b4cc19e 100644
--- a/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
+++ b/helix-core/src/test/java/org/apache/helix/TestPropertyPathBuilder.java
@@ -22,7 +22,6 @@ package org.apache.helix;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
-@Test
 public class TestPropertyPathBuilder {
   @Test
   public void testGetPath() {
diff --git a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
index 57c2579..69d37aa 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
@@ -21,7 +21,6 @@ package org.apache.helix;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.Set;
 import java.util.UUID;
 
 import org.apache.helix.PropertyKey.Builder;
@@ -141,94 +140,100 @@ public class TestZKCallback extends ZkUnitTestBase {
             InstanceType.PARTICIPANT, ZK_ADDR);
     testHelixManager.connect();
 
-    TestZKCallback test = new TestZKCallback();
-
-    TestZKCallback.TestCallbackListener testListener = test.new TestCallbackListener();
-
-    testHelixManager.addMessageListener(testListener, "localhost_8900");
-    testHelixManager.addCurrentStateChangeListener(testListener, "localhost_8900",
-        testHelixManager.getSessionId());
-    testHelixManager.addCustomizedStateRootChangeListener(testListener, "localhost_8900");
-    testHelixManager.addConfigChangeListener(testListener);
-    testHelixManager.addIdealStateChangeListener(testListener);
-    testHelixManager.addExternalViewChangeListener(testListener);
-    testHelixManager.addLiveInstanceChangeListener(testListener);
-    // Initial add listener should trigger the first execution of the
-    // listener callbacks
-    AssertJUnit.assertTrue(testListener.configChangeReceived
-        & testListener.currentStateChangeReceived & testListener.externalViewChangeReceived
-        & testListener.idealStateChangeReceived & testListener.liveInstanceChangeReceived
-        & testListener.messageChangeReceived);
-
-    testListener.Reset();
-    HelixDataAccessor accessor = testHelixManager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    ExternalView extView = new ExternalView("db-12345");
-    accessor.setProperty(keyBuilder.externalView("db-12345"), extView);
-    Thread.sleep(100);
-    AssertJUnit.assertTrue(testListener.externalViewChangeReceived);
-    testListener.Reset();
-
-    CurrentState curState = new CurrentState("db-12345");
-    curState.setSessionId("sessionId");
-    curState.setStateModelDefRef("StateModelDef");
-    accessor.setProperty(keyBuilder.currentState("localhost_8900", testHelixManager.getSessionId(),
-        curState.getId()), curState);
-    Thread.sleep(100);
-    AssertJUnit.assertTrue(testListener.currentStateChangeReceived);
-    testListener.Reset();
-
-    IdealState idealState = new IdealState("db-1234");
-    idealState.setNumPartitions(400);
-    idealState.setReplicas(Integer.toString(2));
-    idealState.setStateModelDefRef("StateModeldef");
-    accessor.setProperty(keyBuilder.idealStates("db-1234"), idealState);
-    Thread.sleep(100);
-    AssertJUnit.assertTrue(testListener.idealStateChangeReceived);
-    testListener.Reset();
-
-    // dummyRecord = new ZNRecord("db-12345");
-    // dataAccessor.setProperty(PropertyType.IDEALSTATES, idealState, "db-12345"
-    // );
-    // Thread.sleep(100);
-    // AssertJUnit.assertTrue(testListener.idealStateChangeReceived);
-    // testListener.Reset();
-
-    // dummyRecord = new ZNRecord("localhost:8900");
-    // List<ZNRecord> recList = new ArrayList<ZNRecord>();
-    // recList.add(dummyRecord);
-
-    testListener.Reset();
-    Message message = new Message(MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
-    message.setTgtSessionId("*");
-    message.setResourceName("testResource");
-    message.setPartitionName("testPartitionKey");
-    message.setStateModelDef("MasterSlave");
-    message.setToState("toState");
-    message.setFromState("fromState");
-    message.setTgtName("testTarget");
-    message.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
-
-    accessor.setProperty(keyBuilder.message("localhost_8900", message.getId()), message);
-    Thread.sleep(500);
-    AssertJUnit.assertTrue(testListener.messageChangeReceived);
-
-    // dummyRecord = new ZNRecord("localhost_9801");
-    LiveInstance liveInstance = new LiveInstance("localhost_9801");
-    liveInstance.setSessionId(UUID.randomUUID().toString());
-    liveInstance.setHelixVersion(UUID.randomUUID().toString());
-    accessor.setProperty(keyBuilder.liveInstance("localhost_9801"), liveInstance);
-    Thread.sleep(500);
-    AssertJUnit.assertTrue(testListener.liveInstanceChangeReceived);
-    testListener.Reset();
-
-    // dataAccessor.setNodeConfigs(recList); Thread.sleep(100);
-    // AssertJUnit.assertTrue(testListener.configChangeReceived);
-    // testListener.Reset();
-
-    accessor.removeProperty(keyBuilder.liveInstance("localhost_8900"));
-    accessor.removeProperty(keyBuilder.liveInstance("localhost_9801"));
+    try {
+      TestZKCallback test = new TestZKCallback();
+
+      TestZKCallback.TestCallbackListener testListener = test.new TestCallbackListener();
+
+      testHelixManager.addMessageListener(testListener, "localhost_8900");
+      testHelixManager.addCurrentStateChangeListener(testListener, "localhost_8900",
+          testHelixManager.getSessionId());
+      testHelixManager.addCustomizedStateRootChangeListener(testListener, "localhost_8900");
+      testHelixManager.addConfigChangeListener(testListener);
+      testHelixManager.addIdealStateChangeListener(testListener);
+      testHelixManager.addExternalViewChangeListener(testListener);
+      testHelixManager.addLiveInstanceChangeListener(testListener);
+      // Initial add listener should trigger the first execution of the
+      // listener callbacks
+      AssertJUnit.assertTrue(
+          testListener.configChangeReceived & testListener.currentStateChangeReceived & testListener.externalViewChangeReceived
+              & testListener.idealStateChangeReceived & testListener.liveInstanceChangeReceived & testListener.messageChangeReceived);
+
+      testListener.Reset();
+      HelixDataAccessor accessor = testHelixManager.getHelixDataAccessor();
+      Builder keyBuilder = accessor.keyBuilder();
+
+      ExternalView extView = new ExternalView("db-12345");
+      accessor.setProperty(keyBuilder.externalView("db-12345"), extView);
+      Thread.sleep(100);
+      AssertJUnit.assertTrue(testListener.externalViewChangeReceived);
+      testListener.Reset();
+
+      CurrentState curState = new CurrentState("db-12345");
+      curState.setSessionId("sessionId");
+      curState.setStateModelDefRef("StateModelDef");
+      accessor.setProperty(keyBuilder
+              .currentState("localhost_8900", testHelixManager.getSessionId(), curState.getId()),
+          curState);
+      Thread.sleep(100);
+      AssertJUnit.assertTrue(testListener.currentStateChangeReceived);
+      testListener.Reset();
+
+      IdealState idealState = new IdealState("db-1234");
+      idealState.setNumPartitions(400);
+      idealState.setReplicas(Integer.toString(2));
+      idealState.setStateModelDefRef("StateModeldef");
+      accessor.setProperty(keyBuilder.idealStates("db-1234"), idealState);
+      Thread.sleep(100);
+      AssertJUnit.assertTrue(testListener.idealStateChangeReceived);
+      testListener.Reset();
+
+      // dummyRecord = new ZNRecord("db-12345");
+      // dataAccessor.setProperty(PropertyType.IDEALSTATES, idealState, "db-12345"
+      // );
+      // Thread.sleep(100);
+      // AssertJUnit.assertTrue(testListener.idealStateChangeReceived);
+      // testListener.Reset();
+
+      // dummyRecord = new ZNRecord("localhost:8900");
+      // List<ZNRecord> recList = new ArrayList<ZNRecord>();
+      // recList.add(dummyRecord);
+
+      testListener.Reset();
+      Message message = new Message(MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
+      message.setTgtSessionId("*");
+      message.setResourceName("testResource");
+      message.setPartitionName("testPartitionKey");
+      message.setStateModelDef("MasterSlave");
+      message.setToState("toState");
+      message.setFromState("fromState");
+      message.setTgtName("testTarget");
+      message.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+
+      accessor.setProperty(keyBuilder.message("localhost_8900", message.getId()), message);
+      Thread.sleep(500);
+      AssertJUnit.assertTrue(testListener.messageChangeReceived);
+
+      // dummyRecord = new ZNRecord("localhost_9801");
+      LiveInstance liveInstance = new LiveInstance("localhost_9801");
+      liveInstance.setSessionId(UUID.randomUUID().toString());
+      liveInstance.setHelixVersion(UUID.randomUUID().toString());
+      accessor.setProperty(keyBuilder.liveInstance("localhost_9801"), liveInstance);
+      Thread.sleep(500);
+      AssertJUnit.assertTrue(testListener.liveInstanceChangeReceived);
+      testListener.Reset();
+
+      // dataAccessor.setNodeConfigs(recList); Thread.sleep(100);
+      // AssertJUnit.assertTrue(testListener.configChangeReceived);
+      // testListener.Reset();
+
+      accessor.removeProperty(keyBuilder.liveInstance("localhost_8900"));
+      accessor.removeProperty(keyBuilder.liveInstance("localhost_9801"));
+    } finally {
+      if (testHelixManager.isConnected()) {
+        testHelixManager.disconnect();
+      }
+    }
   }
 
   @BeforeClass()
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index 79a9f06..36a5b6b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -72,6 +72,9 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
     if (_store != null) {
       _store.close();
     }
+    if (_manager != null) {
+      _manager.disconnect();
+    }
     _gSetupTool.deleteCluster(CLUSTER_NAME);
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
index 2fa8c9f..9f91b6c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java
@@ -43,21 +43,25 @@ public class TestBasicSpectator extends ZkStandAloneCMTestBase implements
     HelixManager relayHelixManager =
         HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, null, InstanceType.SPECTATOR, ZK_ADDR);
 
-    relayHelixManager.connect();
-    relayHelixManager.addExternalViewChangeListener(this);
+    try {
+      relayHelixManager.connect();
+      relayHelixManager.addExternalViewChangeListener(this);
 
-    _gSetupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
-    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
+      _gSetupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
 
-    boolean result =
-        ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
-            ZK_ADDR, CLUSTER_NAME));
+      boolean result = ClusterStateVerifier.verifyByPolling(
+          new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
 
-    Assert.assertTrue(result);
-
-    Assert.assertTrue(_externalViewChanges.containsKey("NextDB"));
-    Assert.assertTrue(_externalViewChanges.containsKey(TEST_DB));
+      Assert.assertTrue(result);
 
+      Assert.assertTrue(_externalViewChanges.containsKey("NextDB"));
+      Assert.assertTrue(_externalViewChanges.containsKey(TEST_DB));
+    } finally {
+      if (relayHelixManager.isConnected()) {
+        relayHelixManager.disconnect();
+      }
+    }
   }
 
   @Override
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
index f97b10f..14bcb07 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
@@ -87,34 +87,35 @@ public class TestCorrectnessOnConnectivityLoss {
     HelixManager participant =
         HelixManagerFactory.getZKHelixManager(_clusterName, "localhost_12918",
             InstanceType.PARTICIPANT, ZK_ADDR);
-    participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
-        new MyStateModelFactory(stateReachedCounts));
-    participant.connect();
 
-    Thread.sleep(1000);
+    try {
+      participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline", new MyStateModelFactory(stateReachedCounts));
+      participant.connect();
 
-    // Ensure that the external view coalesces
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            _clusterName));
-    Assert.assertTrue(result);
+      Thread.sleep(1000);
 
-    // Ensure that there was only one state transition
-    Assert.assertEquals(stateReachedCounts.size(), 1);
-    Assert.assertTrue(stateReachedCounts.containsKey("ONLINE"));
-    Assert.assertEquals(stateReachedCounts.get("ONLINE").intValue(), 1);
+      // Ensure that the external view coalesces
+      boolean result = ClusterStateVerifier
+          .verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName));
+      Assert.assertTrue(result);
 
-    // Now let's stop the ZK server; this should do nothing
-    TestHelper.stopZkServer(_zkServer);
-    Thread.sleep(1000);
+      // Ensure that there was only one state transition
+      Assert.assertEquals(stateReachedCounts.size(), 1);
+      Assert.assertTrue(stateReachedCounts.containsKey("ONLINE"));
+      Assert.assertEquals(stateReachedCounts.get("ONLINE").intValue(), 1);
 
-    // Verify no change
-    Assert.assertEquals(stateReachedCounts.size(), 1);
-    Assert.assertTrue(stateReachedCounts.containsKey("ONLINE"));
-    Assert.assertEquals(stateReachedCounts.get("ONLINE").intValue(), 1);
+      // Now let's stop the ZK server; this should do nothing
+      TestHelper.stopZkServer(_zkServer);
+      Thread.sleep(1000);
 
-    if (participant.isConnected()) {
-      participant.disconnect();
+      // Verify no change
+      Assert.assertEquals(stateReachedCounts.size(), 1);
+      Assert.assertTrue(stateReachedCounts.containsKey("ONLINE"));
+      Assert.assertEquals(stateReachedCounts.get("ONLINE").intValue(), 1);
+    } finally {
+      if (participant.isConnected()) {
+        participant.disconnect();
+      }
     }
   }
 
@@ -125,12 +126,13 @@ public class TestCorrectnessOnConnectivityLoss {
     HelixManager participant =
         HelixManagerFactory.getZKHelixManager(_clusterName, "localhost_12918",
             InstanceType.PARTICIPANT, ZK_ADDR);
-    participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
-        new MyStateModelFactory(stateReachedCounts));
-    participant.connect();
-
     RoutingTableProvider routingTableProvider = new RoutingTableProvider();
+
     try {
+      participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
+          new MyStateModelFactory(stateReachedCounts));
+      participant.connect();
+
       HelixManager spectator = HelixManagerFactory
           .getZKHelixManager(_clusterName, "spectator", InstanceType.SPECTATOR, ZK_ADDR);
       spectator.connect();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
index e7cf412..17448d9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
@@ -93,6 +93,7 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase {
 
     _clusterVerifier =
         new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
 
   private void setupThrottleConfig() {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/common/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/common/ZkStandAloneCMTestBase.java
index 9cbe654..acef349 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/common/ZkStandAloneCMTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/common/ZkStandAloneCMTestBase.java
@@ -92,6 +92,9 @@ public class ZkStandAloneCMTestBase extends ZkTestBase {
 
   @AfterClass
   public void afterClass() throws Exception {
+    if (_clusterVerifier != null) {
+      _clusterVerifier.close();
+    }
     /*
      * shutdown order: 1) disconnect the controller 2) disconnect participants
      */
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index f7bae1f..ee8de4b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -19,19 +19,10 @@ package org.apache.helix.integration.manager;
  * under the License.
  */
 
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.HelixTimerTask;
 import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
-import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * The standalone cluster controller class
  */
@@ -46,4 +37,8 @@ public class ClusterControllerManager extends ClusterManager {
     super(zkAddr, clusterName, controllerName, InstanceType.CONTROLLER);
   }
 
+  @Override
+  public void finalize() {
+    super.finalize();
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index d781854..a57a0c6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -19,16 +19,9 @@ package org.apache.helix.integration.manager;
  * under the License.
  */
 
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
 import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
 import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,4 +51,9 @@ public class ClusterDistributedController extends ClusterManager {
       _waitStopFinishCountDown.countDown();
     }
   }
+
+  @Override
+  public void finalize() {
+    super.finalize();
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterManager.java
index 9e21ea8..63a6be3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterManager.java
@@ -29,9 +29,13 @@ import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class ClusterManager extends ZKHelixManager implements Runnable, ZkTestManager {
   private static Logger LOG = LoggerFactory.getLogger(ClusterControllerManager.class);
+  private static final int DISCONNECT_WAIT_TIME_MS = 3000;
+
+  private final String _clusterName;
+  private final String _instanceName;
+  private final InstanceType _type;
 
   protected CountDownLatch _startCountDown = new CountDownLatch(1);
   protected CountDownLatch _stopCountDown = new CountDownLatch(1);
@@ -39,8 +43,14 @@ public class ClusterManager extends ZKHelixManager implements Runnable, ZkTestMa
 
   protected boolean _started = false;
 
-  protected ClusterManager(String zkAddr, String clusterName, String instanceName, InstanceType type) {
+  protected Thread _watcher;
+
+  protected ClusterManager(String zkAddr, String clusterName, String instanceName,
+      InstanceType type) {
     super(clusterName, instanceName, type, zkAddr);
+    _clusterName = clusterName;
+    _instanceName = instanceName;
+    _type = type;
   }
 
   public void syncStop() {
@@ -62,7 +72,11 @@ public class ClusterManager extends ZKHelixManager implements Runnable, ZkTestMa
       _started = true;
     }
 
-    new Thread(this).start();
+    _watcher = new Thread(this);
+    _watcher.setName(String
+        .format("ClusterManager_Watcher_%s_%s_%s", _clusterName, _instanceName, _type.name()));
+    _watcher.start();
+
     try {
       _startCountDown.await();
     } catch (InterruptedException e) {
@@ -94,5 +108,21 @@ public class ClusterManager extends ZKHelixManager implements Runnable, ZkTestMa
   public List<CallbackHandler> getHandlers() {
     return _handlers;
   }
+
+  @Override
+  public void finalize() {
+    _watcher.interrupt();
+    try {
+      _watcher.join(DISCONNECT_WAIT_TIME_MS);
+    } catch (InterruptedException e) {
+      LOG.error("ClusterManager watcher cleanup in the finalize method was interrupted.", e);
+    } finally {
+      if (isConnected()) {
+        LOG.warn(
+            "The HelixManager ({}-{}-{}) is still connected after {} ms wait. This is a potential resource leakage!",
+            _clusterName, _instanceName, _type.name(), DISCONNECT_WAIT_TIME_MS);
+      }
+    }
+  }
 }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterSpectatorManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterSpectatorManager.java
index 2975f0a..50f080c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterSpectatorManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterSpectatorManager.java
@@ -19,26 +19,9 @@ package org.apache.helix.integration.manager;
  * under the License.
  */
 
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
 import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class ClusterSpectatorManager extends ClusterManager{
-  private static Logger LOG = LoggerFactory.getLogger(ClusterControllerManager.class);
-
-  private final CountDownLatch _startCountDown = new CountDownLatch(1);
-  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
-  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-
-  private boolean _started = false;
 
+public class ClusterSpectatorManager extends ClusterManager {
   public ClusterSpectatorManager(String zkAddr, String clusterName) {
     this(zkAddr, clusterName, "spectator");
   }
@@ -46,4 +29,9 @@ public class ClusterSpectatorManager extends ClusterManager{
   public ClusterSpectatorManager(String zkAddr, String clusterName, String spectatorName) {
     super(zkAddr, clusterName, spectatorName, InstanceType.SPECTATOR);
   }
+
+  @Override
+  public void finalize() {
+    super.finalize();
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 0b1983a..c156332 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -123,4 +123,9 @@ public class MockParticipantManager extends ClusterManager {
   public List<CallbackHandler> getHandlers() {
     return _handlers;
   }
-}
\ No newline at end of file
+
+  @Override
+  public void finalize() {
+    super.finalize();
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestCrossClusterMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestCrossClusterMessagingService.java
index 64dbbea..8951e8e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestCrossClusterMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestCrossClusterMessagingService.java
@@ -30,6 +30,8 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
@@ -54,9 +56,9 @@ public class TestCrossClusterMessagingService extends TestMessagingService {
     _adminController = new ClusterControllerManager(ZK_ADDR, ADMIN_CLUSTER_NAME, controllerName);
     _adminController.syncStart();
 
-    _clusterVerifier =
+    ZkHelixClusterVerifier adminClusterVerifier =
         new BestPossibleExternalViewVerifier.Builder(ADMIN_CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(adminClusterVerifier.verifyByPolling());
   }
 
   @AfterClass
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
index a910f28..0ef34a7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -248,6 +248,11 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
       // helix manager is connected. Thus we do not do connected check here.
       return _messagingService;
     }
+
+    @Override
+    public void finalize() {
+      super.finalize();
+    }
   }
 
   static class MockMessagingService extends DefaultMessagingService {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index d4bdcf9..4395586 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -177,6 +177,7 @@ public class TestMultiZkHelixJavaApis {
       }, TestHelper.WAIT_DURATION));
 
       // Tear down zookeepers
+      ZK_CLIENT_MAP.forEach((zkAddress, zkClient) -> zkClient.close());
       ZK_SERVER_MAP.forEach((zkAddress, zkServer) -> zkServer.shutdown());
 
       // Stop MockMSDS
diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
index 740f2fd..0102375 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
@@ -21,18 +21,13 @@ package org.apache.helix.integration.paticipant;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
-import java.util.stream.Collectors;
 
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.customizedstate.CustomizedStateProvider;
@@ -42,12 +37,10 @@ import org.apache.helix.model.CustomizedState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-
 public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
   private static Logger LOG = LoggerFactory.getLogger(TestCustomizedStateUpdate.class);
   private final String CUSTOMIZE_STATE_NAME = "testState1";
@@ -55,7 +48,6 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
   private final String PARTITION_NAME2 = "testPartition2";
   private final String RESOURCE_NAME = "testResource1";
   private final String PARTITION_STATE = "partitionState";
-  private static HelixManager _manager;
   private static CustomizedStateProvider _mockProvider;
   private PropertyKey _propertyKey;
   private HelixDataAccessor _dataAccessor;
@@ -63,9 +55,6 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
   @BeforeClass
   public void beforeClass() throws Exception {
     super.beforeClass();
-    _manager = HelixManagerFactory
-        .getZKHelixManager(CLUSTER_NAME, "admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
-    _manager.connect();
     _participants[0].connect();
     _mockProvider = CustomizedStateProviderFactory.getInstance()
         .buildCustomizedStateProvider(_manager, _participants[0].getInstanceName());
@@ -74,12 +63,6 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
         .customizedStates(_participants[0].getInstanceName(), CUSTOMIZE_STATE_NAME);
   }
 
-  @AfterClass
-  public void afterClass() throws Exception {
-    super.afterClass();
-    _manager.disconnect();
-  }
-
   @BeforeMethod
   public void beforeMethod() {
     _dataAccessor.removeProperty(_propertyKey);
@@ -154,7 +137,7 @@ public class TestCustomizedStateUpdate extends ZkStandAloneCMTestBase {
     mapView = customizedState.getRecord().getMapFields();
     Assert.assertEquals(mapView.keySet().size(), 2);
     Assert.assertEqualsNoOrder(mapView.keySet().toArray(),
-        new String[]{PARTITION_NAME1, PARTITION_NAME2});
+        new String[] { PARTITION_NAME1, PARTITION_NAME2 });
 
     Map<String, String> partitionMap1 = _mockProvider
         .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
index afcabdf..396590a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
@@ -323,6 +323,9 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
 
   @AfterClass
   public void afterClass() throws Exception {
+    if (_clusterVerifier != null) {
+      _clusterVerifier.close();
+    }
     /*
       shutdown order: 1) disconnect the controller 2) disconnect participants
      */
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
index db51fd5..6fa5ba9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
@@ -289,6 +289,11 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
       super(zkAddr, clusterName, instanceName);
       _msModelFactory = new MockDelayMSStateModelFactory();
     }
+
+    @Override
+    public void finalize() {
+      super.finalize();
+    }
   }
 
   public static class MockDelayMSStateModelFactory extends MockMSModelFactory {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
index ab88cea..a7e6dfb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
@@ -271,6 +271,7 @@ public class TestRoutingTableProviderFromCurrentStates extends ZkTestBase {
             new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName);
         _participants[shutdownParticipantIndex].syncStart();
       }
+      helixManager.disconnect();
     }
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java
index 8e02921..b033af6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java
@@ -88,6 +88,7 @@ public class TestBatchAddJobs extends ZkTestBase {
   public void afterClass() {
     for (SubmitJobTask submitJobTask : _submitJobTasks) {
       submitJobTask.interrupt();
+      submitJobTask.cleanup();
     }
 
     deleteCluster(CLUSTER_NAME);
@@ -96,12 +97,13 @@ public class TestBatchAddJobs extends ZkTestBase {
   static class SubmitJobTask extends Thread {
     private TaskDriver _driver;
     private String _jobPrefixName;
+    private HelixManager _manager;
 
     public SubmitJobTask(String zkAddress, int index) throws Exception {
-      HelixManager manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Administrator",
+      _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Administrator",
           InstanceType.ADMINISTRATOR, zkAddress);
-      manager.connect();
-      _driver = new TaskDriver(manager);
+      _manager.connect();
+      _driver = new TaskDriver(_manager);
       _jobPrefixName = "JOB_" + index + "#";
     }
 
@@ -118,5 +120,9 @@ public class TestBatchAddJobs extends ZkTestBase {
 
       _driver.enqueueJobs(QUEUE_NAME, jobNames, jobConfigBuilders);
     }
+
+    public void cleanup() {
+      _manager.disconnect();
+    }
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java
index 6379b51..0770069 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java
@@ -59,8 +59,6 @@ public class TestForceDeleteWorkflow extends TaskTestBase {
   // Long delay to simulate the tasks that are stuck in Task.cancel().
   private static final String STOP_DELAY = "1000000";
   private HelixAdmin _admin;
-  protected HelixManager _manager;
-  protected TaskDriver _driver;
 
   // These AtomicIntegers are used to verify that the tasks are indeed stuck in Task.cancel().
   // CANCEL shows that the task cancellation process has been started. (Incremented at the beginning
@@ -99,12 +97,6 @@ public class TestForceDeleteWorkflow extends TaskTestBase {
       _participants[i].syncStart();
     }
 
-    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
-        InstanceType.ADMINISTRATOR, ZK_ADDR);
-    _manager.connect();
-
-    _driver = new TaskDriver(_manager);
-
     _admin = _gSetupTool.getClusterManagementTool();
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueDeleteIdealState.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueDeleteIdealState.java
index 1e30b55..13b08f8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueDeleteIdealState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueDeleteIdealState.java
@@ -51,11 +51,6 @@ public class TestJobQueueDeleteIdealState extends TaskTestBase {
     _numPartitions = 1;
     _numNodes = 3;
     super.beforeClass();
-    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
-        InstanceType.ADMINISTRATOR, ZK_ADDR);
-
-    _manager.connect();
-    _driver = new TaskDriver(_manager);
   }
 
   @Test
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopAndResumeQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopAndResumeQueue.java
index fa19ce8..16e38b9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopAndResumeQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopAndResumeQueue.java
@@ -19,19 +19,16 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 import org.apache.helix.TestHelper;
 import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
 
 /**
  * Test to check if context of the queue gets properly updated even when there is no job queued in
@@ -45,10 +42,6 @@ public class TestStopAndResumeQueue extends TaskTestBase {
     _numPartitions = 1;
     _numNodes = 3;
     super.beforeClass();
-    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
-        InstanceType.ADMINISTRATOR, ZK_ADDR);
-    _manager.connect();
-    _driver = new TaskDriver(_manager);
   }
 
   @Test
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStoppingQueueFailToStop.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStoppingQueueFailToStop.java
index ff08107..050534c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestStoppingQueueFailToStop.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStoppingQueueFailToStop.java
@@ -56,8 +56,6 @@ public class TestStoppingQueueFailToStop extends TaskTestBase {
     _numPartitions = 1;
     _numNodes = 3;
     super.beforeClass();
-    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
-        InstanceType.ADMINISTRATOR, ZK_ADDR);
 
     // Stop participants that have been started in super class
     for (int i = 0; i < _numNodes; i++) {
@@ -79,9 +77,6 @@ public class TestStoppingQueueFailToStop extends TaskTestBase {
           new TaskStateModelFactory(_participants[i], taskFactoryReg));
       _participants[i].syncStart();
     }
-
-    _manager.connect();
-    _driver = new TaskDriver(_manager);
   }
 
   @Test
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
index 85996e7..a40ce45 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskSchedulingTwoCurrentStates.java
@@ -75,8 +75,6 @@ public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
     _numPartitions = 1;
     _numNodes = 3;
     super.beforeClass();
-    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
-        InstanceType.ADMINISTRATOR, ZK_ADDR);
 
     // Stop participants that have been started in super class
     for (int i = 0; i < _numNodes; i++) {
@@ -98,9 +96,6 @@ public class TestTaskSchedulingTwoCurrentStates extends TaskTestBase {
           new TaskStateModelFactory(_participants[i], taskFactoryReg));
       _participants[i].syncStart();
     }
-
-    _manager.connect();
-    _driver = new TaskDriver(_manager);
   }
 
   @Test
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java
index 43551f7..4cb5f41 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskStopQueue.java
@@ -21,21 +21,15 @@ package org.apache.helix.integration.task;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.AccessOption;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 /**
@@ -45,19 +39,6 @@ import org.testng.annotations.Test;
 public class TestTaskStopQueue extends TaskTestBase {
   private static final long TIMEOUT = 200000L;
   private static final String EXECUTION_TIME = "100000";
-  private HelixAdmin _admin;
-  protected HelixManager _manager;
-  protected TaskDriver _driver;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    super.beforeClass();
-    _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin",
-        InstanceType.ADMINISTRATOR, ZK_ADDR);
-    _manager.connect();
-    _driver = new TaskDriver(_manager);
-    _admin = _gSetupTool.getClusterManagementTool();
-  }
 
   @Test
   public void testStopRunningQueue() throws InterruptedException {
diff --git a/metadata-store-directory-common/src/test/java/org/apache/helix/msdcommon/mock/TestMockMetadataStoreDirectoryServer.java b/metadata-store-directory-common/src/test/java/org/apache/helix/msdcommon/mock/TestMockMetadataStoreDirectoryServer.java
index 015bc8c..1f58e19 100644
--- a/metadata-store-directory-common/src/test/java/org/apache/helix/msdcommon/mock/TestMockMetadataStoreDirectoryServer.java
+++ b/metadata-store-directory-common/src/test/java/org/apache/helix/msdcommon/mock/TestMockMetadataStoreDirectoryServer.java
@@ -21,13 +21,11 @@ package org.apache.helix.msdcommon.mock;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
 import org.apache.helix.msdcommon.constant.TestConstants;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -35,9 +33,8 @@ import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
-import org.junit.Test;
 import org.testng.Assert;
-
+import org.testng.annotations.Test;
 
 public class TestMockMetadataStoreDirectoryServer {
   @Test