You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/11/20 22:12:17 UTC

[02/52] [abbrv] [HELIX-279] Apply gc handling fixes to ZKHelixManager

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index be65ad1..00537a4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -30,13 +30,13 @@ import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.manager.ZkTestManager;
 import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
@@ -63,15 +63,16 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -80,9 +81,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
             .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
                 clusterName));
     Assert.assertTrue(result);
-    final ZkHelixTestManager controllerManager = controller.getManager();
-    final ZkHelixTestManager participantManagerToExpire =
-        participants[1].getManager();
+    final MockParticipantManager participantManagerToExpire = participants[1];
 
     // check controller zk-watchers
     result = TestHelper.verify(new TestHelper.Verifier() {
@@ -90,7 +89,8 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
       @Override
       public boolean verify() throws Exception {
         Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
-        Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
+        // Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
+        Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
         // System.out.println("controller watch paths: " + watchPaths);
 
         // controller should have 5 + 2n + m + (m+2)n zk-watchers
@@ -118,7 +118,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     // check HelixManager#_handlers
     // printHandlers(controllerManager);
     // printHandlers(participantManagerToExpire);
-    int controllerHandlerNb = controllerManager.getHandlers().size();
+    int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManagerToExpire.getHandlers().size();
     Assert.assertEquals(controllerHandlerNb, 9,
         "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
@@ -145,7 +145,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
       @Override
       public boolean verify() throws Exception {
         Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
-        Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
+        Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
         // System.out.println("controller watch paths after session expiry: " + watchPaths);
 
         // controller should have 5 + 2n + m + (m+2)n zk-watchers
@@ -173,13 +173,19 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
     // check handlers
     // printHandlers(controllerManager);
     // printHandlers(participantManagerToExpire);
-    int handlerNb = controllerManager.getHandlers().size();
+    int handlerNb = controller.getHandlers().size();
     Assert.assertEquals(handlerNb, controllerHandlerNb,
         "controller callback handlers should not increase after participant session expiry");
     handlerNb = participantManagerToExpire.getHandlers().size();
     Assert.assertEquals(handlerNb, particHandlerNb,
         "participant callback handlers should not increase after participant session expiry");
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
@@ -202,15 +208,16 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -219,15 +226,16 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
             .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
                 clusterName));
     Assert.assertTrue(result);
-    final ZkHelixTestManager controllerManager = controller.getManager();
-    final ZkHelixTestManager participantManager = participants[0].getManager();
+    // final ZkHelixTestManager controllerManager = controller.getManager();
+    // final ZkHelixTestManager participantManager = participants[0].getManager();
+    final MockParticipantManager participantManager = participants[0];
 
     // wait until we get all the listeners registered
     result = TestHelper.verify(new TestHelper.Verifier() {
 
       @Override
       public boolean verify() throws Exception {
-        int controllerHandlerNb = controllerManager.getHandlers().size();
+        int controllerHandlerNb = controller.getHandlers().size();
         int particHandlerNb = participantManager.getHandlers().size();
         if (controllerHandlerNb == 9 && particHandlerNb == 2)
           return true;
@@ -236,21 +244,21 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
       }
     }, 1000);
 
-    int controllerHandlerNb = controllerManager.getHandlers().size();
+    int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManager.getHandlers().size();
     Assert.assertEquals(controllerHandlerNb, 9,
         "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
-            + controllerHandlerNb + ", " + printHandlers(controllerManager));
+            + controllerHandlerNb + ", " + printHandlers(controller));
     Assert.assertEquals(particHandlerNb, 2,
         "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was "
             + particHandlerNb + ", " + printHandlers(participantManager));
 
     // expire controller
     System.out.println("Expiring controller session...");
-    String oldSessionId = controllerManager.getSessionId();
+    String oldSessionId = controller.getSessionId();
 
-    ZkTestHelper.expireSession(controllerManager.getZkClient());
-    String newSessionId = controllerManager.getSessionId();
+    ZkTestHelper.expireSession(controller.getZkClient());
+    String newSessionId = controller.getSessionId();
     System.out.println("Expired controller session. oldSessionId: " + oldSessionId
         + ", newSessionId: " + newSessionId);
 
@@ -265,7 +273,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
       @Override
       public boolean verify() throws Exception {
         Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR);
-        Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId());
+        Set<String> watchPaths = watchers.get("0x" + controller.getSessionId());
         // System.out.println("controller watch paths after session expiry: " + watchPaths);
 
         // controller should have 5 + 2n + m + (m+2)n zk-watchers
@@ -292,15 +300,21 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
 
     // check HelixManager#_handlers
     // printHandlers(controllerManager);
-    int handlerNb = controllerManager.getHandlers().size();
+    int handlerNb = controller.getHandlers().size();
     Assert.assertEquals(handlerNb, controllerHandlerNb,
         "controller callback handlers should not increase after participant session expiry, but was "
-            + printHandlers(controllerManager));
+            + printHandlers(controller));
     handlerNb = participantManager.getHandlers().size();
     Assert.assertEquals(handlerNb, particHandlerNb,
         "participant callback handlers should not increase after participant session expiry, but was "
             + printHandlers(participantManager));
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
@@ -319,18 +333,20 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
         2, // replicas
         "MasterSlave", true);
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", zkAddr);
+    final ClusterControllerManager controller =
+        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
     controller.syncStart();
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipant(clusterName, instanceName, zkAddr, null);
+      participants[i] = new MockParticipantManager(zkAddr, clusterName, instanceName);
       participants[i].syncStart();
 
       // register a controller listener on participant_0
       if (i == 0) {
-        ZkHelixTestManager manager = participants[0].getManager();
+        // ZkHelixTestManager manager = participants[0].getManager();
+        MockParticipantManager manager = participants[0];
         manager.addCurrentStateChangeListener(new CurrentStateChangeListener() {
           @Override
           public void onStateChange(String instanceName, List<CurrentState> statesInfo,
@@ -349,7 +365,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
                 clusterName));
     Assert.assertTrue(result);
 
-    ZkHelixTestManager participantToExpire = participants[0].getManager();
+    MockParticipantManager participantToExpire = participants[0];
     String oldSessionId = participantToExpire.getSessionId();
     PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
 
@@ -474,11 +490,18 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase {
             "Should have no exist-watches. exist-watches on CURRENTSTATE/{oldSessionId} and CURRENTSTATE/{oldSessionId}/TestDB0 should be cleared during handleNewSession");
 
     // Thread.sleep(1000);
+
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
   // debug code
-  static String printHandlers(ZkHelixTestManager manager) {
+  static String printHandlers(ZkTestManager manager) {
     StringBuilder sb = new StringBuilder();
     List<CallbackHandler> handlers = manager.getHandlers();
     sb.append(manager.getInstanceName() + " has " + handlers.size() + " cb-handlers. [");

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
index d04fbfd..9188e61 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java
@@ -19,14 +19,13 @@ package org.apache.helix.integration;
  * under the License.
  */
 
-import java.util.Map;
 import java.util.logging.Level;
 
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -37,7 +36,6 @@ import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.util.ZKClientPool;
 import org.apache.log4j.Logger;
-import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
@@ -85,7 +83,7 @@ public class ZkIntegrationTestBase {
 
   protected String getCurrentLeader(ZkClient zkClient, String clusterName) {
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
@@ -95,49 +93,6 @@ public class ZkIntegrationTestBase {
     return leader.getInstanceName();
   }
 
-  /**
-   * Stop current leader and returns the new leader
-   * @param zkClient
-   * @param clusterName
-   * @param startCMResultMap
-   * @return
-   */
-  protected String stopCurrentLeader(ZkClient zkClient, String clusterName,
-      Map<String, StartCMResult> startCMResultMap) {
-    String leader = getCurrentLeader(zkClient, clusterName);
-    Assert.assertTrue(leader != null);
-    System.out.println("stop leader: " + leader + " in " + clusterName);
-    Assert.assertTrue(leader != null);
-
-    StartCMResult result = startCMResultMap.remove(leader);
-    Assert.assertTrue(result._manager != null);
-    result._manager.disconnect();
-
-    Assert.assertTrue(result._thread != null);
-    result._thread.interrupt();
-
-    boolean isNewLeaderElected = false;
-    String newLeader = null;
-    try {
-      for (int i = 0; i < 5; i++) {
-        Thread.sleep(1000);
-        newLeader = getCurrentLeader(zkClient, clusterName);
-        if (!newLeader.equals(leader)) {
-          isNewLeaderElected = true;
-          System.out.println("new leader elected: " + newLeader + " in " + clusterName);
-          break;
-        }
-      }
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-    if (isNewLeaderElected == false) {
-      System.out.println("fail to elect a new leader in " + clusterName);
-    }
-    AssertJUnit.assertTrue(isNewLeaderElected);
-    return newLeader;
-  }
-
   protected void enableHealthCheck(String clusterName) {
     ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName).build();
     new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" + true);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
index e759fc7..5d169d5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -20,16 +20,9 @@ package org.apache.helix.integration;
  */
 
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -57,8 +50,8 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
   protected final String CLASS_NAME = getShortClassName();
   protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
 
-  protected Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
-  protected ZkClient _zkClient;
+  protected MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
+  protected ClusterControllerManager _controller;
 
   int _replica = 3;
 
@@ -67,11 +60,9 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
     // Logger.getRootLogger().setLevel(Level.INFO);
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
     String namespace = "/" + CLUSTER_NAME;
-    if (_zkClient.exists(namespace)) {
-      _zkClient.deleteRecursive(namespace);
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
     }
     _setupTool = new ClusterSetup(ZK_ADDR);
 
@@ -87,21 +78,14 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
     // start dummy participants
     for (int i = 0; i < NODE_NR; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      if (_startCMResultMap.get(instanceName) != null) {
-        LOG.error("fail to start particpant:" + instanceName
-            + "(participant with same name already exists)");
-      } else {
-        StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
-        _startCMResultMap.put(instanceName, result);
-      }
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
     }
 
     // start controller
     String controllerName = CONTROLLER_PREFIX + "_0";
-    StartCMResult startResult =
-        TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
-            HelixControllerMain.STANDALONE);
-    _startCMResultMap.put(controllerName, startResult);
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
 
     boolean result =
         ClusterStateVerifier
@@ -119,30 +103,11 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
      * shutdown order: 1) disconnect the controller 2) disconnect participants
      */
 
-    StartCMResult result;
-    Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
-    while (it.hasNext()) {
-      String instanceName = it.next().getKey();
-      if (instanceName.startsWith(CONTROLLER_PREFIX)) {
-        result = _startCMResultMap.get(instanceName);
-        result._manager.disconnect();
-        result._thread.interrupt();
-        it.remove();
-      }
-    }
-
-    Thread.sleep(100);
-    it = _startCMResultMap.entrySet().iterator();
-    while (it.hasNext()) {
-      String instanceName = it.next().getKey();
-      result = _startCMResultMap.get(instanceName);
-      result._manager.disconnect();
-      result._thread.interrupt();
-      it.remove();
+    _controller.syncStop();
+    for (int i = 0; i < NODE_NR; i++) {
+      _participants[i].syncStop();
     }
 
-    _zkClient.close();
-    // logger.info("END at " + new Date(System.currentTimeMillis()));
     System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
index f19e5dd..c6fbea6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBaseWithPropertyServerCheck.java
@@ -23,8 +23,11 @@ import java.util.List;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
 import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.StatusUpdate;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -36,6 +39,7 @@ import org.testng.annotations.BeforeClass;
  */
 
 public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneCMTestBase {
+  @Override
   @BeforeClass
   public void beforeClass() throws Exception {
     ZKPropertyTransferServer.PERIOD = 500;
@@ -44,19 +48,20 @@ public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneC
     super.beforeClass();
 
     Thread.sleep(1000);
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder kb = accessor.keyBuilder();
+
     for (int i = 0; i < NODE_NR; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      if (_startCMResultMap.get(instanceName) != null) {
-        HelixDataAccessor accessor =
-            _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
-        Builder kb = accessor.keyBuilder();
-        List<StatusUpdate> statusUpdates =
-            accessor.getChildValues(kb.stateTransitionStatus(instanceName,
-                _startCMResultMap.get(instanceName)._manager.getSessionId(), TEST_DB));
+      String instanceName = _participants[i].getInstanceName();
+      List<StatusUpdate> statusUpdates =
+          accessor.getChildValues(kb.stateTransitionStatus(instanceName,
+              _participants[i].getSessionId(), TEST_DB));
+
         for (int j = 0; j < 10; j++) {
           statusUpdates =
               accessor.getChildValues(kb.stateTransitionStatus(instanceName,
-                  _startCMResultMap.get(instanceName)._manager.getSessionId(), TEST_DB));
+                _participants[i].getSessionId(), TEST_DB));
           if (statusUpdates.size() == 0) {
             Thread.sleep(500);
           } else {
@@ -70,10 +75,10 @@ public class ZkStandAloneCMTestBaseWithPropertyServerCheck extends ZkStandAloneC
           Assert
               .assertTrue(update.getRecord().getSimpleField(ZKPropertyTransferServer.SERVER) != null);
         }
-      }
     }
   }
 
+  @Override
   @AfterClass
   public void afterClass() throws Exception {
     super.afterClass();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index e0da9fb..b8f0f2b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -22,12 +22,14 @@ package org.apache.helix.integration.manager;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ControllerManager;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.log4j.Logger;
 
-public class ClusterControllerManager extends ControllerManager implements Runnable, ZkTestManager {
+public class ClusterControllerManager extends ZKHelixManager implements Runnable, ZkTestManager {
   private static Logger LOG = Logger.getLogger(ClusterControllerManager.class);
 
   private final CountDownLatch _startCountDown = new CountDownLatch(1);
@@ -35,7 +37,7 @@ public class ClusterControllerManager extends ControllerManager implements Runna
   private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
 
   public ClusterControllerManager(String zkAddr, String clusterName, String controllerName) {
-    super(zkAddr, clusterName, controllerName);
+    super(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
   }
 
   public void syncStop() {
@@ -43,8 +45,7 @@ public class ClusterControllerManager extends ControllerManager implements Runna
     try {
       _waitStopFinishCountDown.await();
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Interrupted waiting for finish", e);
     }
   }
 
@@ -54,8 +55,7 @@ public class ClusterControllerManager extends ControllerManager implements Runna
     try {
       _startCountDown.await();
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Interrupted waiting for start", e);
     }
   }
 
@@ -84,4 +84,7 @@ public class ClusterControllerManager extends ControllerManager implements Runna
     return _handlers;
   }
 
+  public List<HelixTimerTask> getControllerTimerTasks() {
+    return _controllerTimerTasks;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index 751c3cb..44d0957 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -22,12 +22,15 @@ package org.apache.helix.integration.manager;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.DistributedControllerManager;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.participant.DistClusterControllerStateModelFactory;
+import org.apache.helix.participant.StateMachineEngine;
 import org.apache.log4j.Logger;
 
-public class ClusterDistributedController extends DistributedControllerManager implements Runnable,
+public class ClusterDistributedController extends ZKHelixManager implements Runnable,
     ZkTestManager {
   private static Logger LOG = Logger.getLogger(ClusterDistributedController.class);
 
@@ -36,7 +39,7 @@ public class ClusterDistributedController extends DistributedControllerManager i
   private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
 
   public ClusterDistributedController(String zkAddr, String clusterName, String controllerName) {
-    super(zkAddr, clusterName, controllerName);
+    super(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkAddr);
   }
 
   public void syncStop() {
@@ -44,8 +47,7 @@ public class ClusterDistributedController extends DistributedControllerManager i
     try {
       _waitStopFinishCountDown.await();
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Interrupted waiting for finish", e);
     }
   }
 
@@ -55,14 +57,18 @@ public class ClusterDistributedController extends DistributedControllerManager i
     try {
       _startCountDown.await();
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Interrupted waiting for start", e);
     }
   }
 
   @Override
   public void run() {
     try {
+      StateMachineEngine stateMach = getStateMachineEngine();
+      DistClusterControllerStateModelFactory lsModelFactory =
+          new DistClusterControllerStateModelFactory(_zkAddress);
+      stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
+
       connect();
       _startCountDown.countDown();
       _stopCountDown.await();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 8249f4a..34efe34 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -22,18 +22,20 @@ package org.apache.helix.integration.manager;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
 import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
+import org.apache.helix.mock.participant.MockJobIntf;
 import org.apache.helix.mock.participant.MockMSModelFactory;
 import org.apache.helix.mock.participant.MockSchemataModelFactory;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.log4j.Logger;
 
-public class MockParticipantManager extends ParticipantManager implements Runnable, ZkTestManager {
+public class MockParticipantManager extends ZKHelixManager implements Runnable, ZkTestManager {
   private static Logger LOG = Logger.getLogger(MockParticipantManager.class);
 
   private final CountDownLatch _startCountDown = new CountDownLatch(1);
@@ -43,7 +45,7 @@ public class MockParticipantManager extends ParticipantManager implements Runnab
   private final MockMSModelFactory _msModelFactory = new MockMSModelFactory(null);
 
   public MockParticipantManager(String zkAddr, String clusterName, String instanceName) {
-    super(zkAddr, clusterName, instanceName);
+    super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
   }
 
   public void setTransition(MockTransition transition) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
index b5ef255..aa00a8d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
@@ -22,14 +22,16 @@ package org.apache.helix.integration.manager;
 import java.util.Date;
 import java.util.List;
 
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.DistributedControllerManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.mock.participant.MockMSModelFactory;
 import org.apache.helix.model.LiveInstance;
@@ -61,11 +63,12 @@ public class TestDistributedControllerManager extends ZkIntegrationTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    DistributedControllerManager[] distributedControllers = new DistributedControllerManager[n];
+    HelixManager[] distributedControllers = new HelixManager[n];
     for (int i = 0; i < n; i++) {
       int port = 12918 + i;
       distributedControllers[i] =
-          new DistributedControllerManager(ZK_ADDR, clusterName, "localhost_" + port);
+          new ZKHelixManager(clusterName, "localhost_" + port, InstanceType.CONTROLLER_PARTICIPANT,
+              ZK_ADDR);
       distributedControllers[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
           new MockMSModelFactory());
       distributedControllers[i].connect();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index a818fd3..82f583f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -24,6 +24,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathConfig;
@@ -33,9 +35,8 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.manager.zk.ControllerManager;
-import org.apache.helix.manager.zk.ParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.mock.participant.MockMSModelFactory;
@@ -70,13 +71,14 @@ public class TestParticipantManager extends ZkIntegrationTestBase {
         1, // replicas
         "MasterSlave", true); // do rebalance
 
-    ParticipantManager participant =
-        new ParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
+    HelixManager participant =
+        new ZKHelixManager(clusterName, "localhost_12918", InstanceType.PARTICIPANT, ZK_ADDR);
     participant.getStateMachineEngine().registerStateModelFactory("MasterSlave",
         new MockMSModelFactory());
     participant.connect();
 
-    ControllerManager controller = new ControllerManager(ZK_ADDR, clusterName, "controller_0");
+    HelixManager controller =
+        new ZKHelixManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
     controller.connect();
 
     boolean result =
@@ -121,8 +123,9 @@ public class TestParticipantManager extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ControllerManager controller = new ControllerManager(ZK_ADDR, clusterName, "controller_0");
-    controller.connect();
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
@@ -151,7 +154,7 @@ public class TestParticipantManager extends ZkIntegrationTestBase {
     Assert.assertNotSame(newSessionId, oldSessionId);
 
     // cleanup
-    controller.disconnect();
+    controller.syncStop();
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }
@@ -207,8 +210,9 @@ public class TestParticipantManager extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ControllerManager controller = new ControllerManager(ZK_ADDR, clusterName, "controller_0");
-    controller.connect();
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
@@ -245,7 +249,7 @@ public class TestParticipantManager extends ZkIntegrationTestBase {
     Assert.assertTrue(errString.indexOf("InterruptedException") != -1);
 
     // cleanup
-    controller.disconnect();
+    controller.syncStop();
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java b/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
index 1b9ee62..2aa4544 100644
--- a/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
+++ b/helix-core/src/test/java/org/apache/helix/josql/TestJosqlProcessor.java
@@ -40,8 +40,8 @@ public class TestJosqlProcessor extends ZkStandAloneCMTestBase {
     "integrationTest"
   })
   public void testJosqlQuery() throws Exception {
-    HelixManager manager =
-        ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
+    HelixManager manager = _participants[0];
+    // ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
 
     // Find the instance name that contains partition TestDB_2 and state is 'MASTER'
     String SQL =
@@ -183,8 +183,8 @@ public class TestJosqlProcessor extends ZkStandAloneCMTestBase {
 
   @Test(groups = ("unitTest"))
   public void testOrderby() throws Exception {
-    HelixManager manager =
-        ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
+    HelixManager manager = _participants[0];
+    // ((TestHelper.StartCMResult) (_startCMResultMap.values().toArray()[0]))._manager;
 
     Map<String, ZNRecord> scnMap = new HashMap<String, ZNRecord>();
     for (int i = 0; i < NODE_NR; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
index 4cef5a0..8b5b30c 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestDefaultControllerMsgHandlerFactory.java
@@ -30,10 +30,13 @@ import org.apache.helix.manager.zk.DefaultControllerMessageHandlerFactory.Defaul
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
 public class TestDefaultControllerMsgHandlerFactory {
+  private static Logger LOG = Logger.getLogger(TestDefaultControllerMsgHandlerFactory.class);
+
   @Test()
   public void testDefaultControllerMsgHandlerFactory() {
     System.out.println("START TestDefaultControllerMsgHandlerFactory at "
@@ -70,8 +73,7 @@ public class TestDefaultControllerMsgHandlerFactory {
     } catch (HelixException e) {
       exceptionCaught = true;
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Interrupted handling message", e);
     }
     AssertJUnit.assertTrue(exceptionCaught);
 
@@ -83,8 +85,7 @@ public class TestDefaultControllerMsgHandlerFactory {
     } catch (HelixException e) {
       exceptionCaught = true;
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Interrupted handling message", e);
     }
     AssertJUnit.assertFalse(exceptionCaught);
     System.out.println("END TestDefaultControllerMsgHandlerFactory at "

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
index 3cca10c..352cdd5 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
@@ -21,10 +21,10 @@ package org.apache.helix.manager.zk;
 
 import java.util.Date;
 
-import org.apache.helix.InstanceType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -47,17 +47,17 @@ public class TestHandleNewSession extends ZkIntegrationTestBase {
         3, // replicas
         "MasterSlave", true); // do rebalance
 
-    ZKHelixManager manager =
-        new ZKHelixManager(clusterName, "localhost_12918", InstanceType.PARTICIPANT, ZK_ADDR);
-    manager.connect();
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
+    participant.syncStart();
 
     // Logger.getRootLogger().setLevel(Level.INFO);
-    String lastSessionId = manager.getSessionId();
+    String lastSessionId = participant.getSessionId();
     for (int i = 0; i < 3; i++) {
       // System.err.println("curSessionId: " + lastSessionId);
-      ZkTestHelper.expireSession(manager._zkClient);
+      ZkTestHelper.expireSession(participant.getZkClient());
 
-      String sessionId = manager.getSessionId();
+      String sessionId = participant.getSessionId();
       Assert.assertTrue(sessionId.compareTo(lastSessionId) > 0,
           "Session id should be increased after expiry");
       lastSessionId = sessionId;
@@ -71,7 +71,7 @@ public class TestHandleNewSession extends ZkIntegrationTestBase {
 
     // Logger.getRootLogger().setLevel(Level.INFO);
     System.out.println("Disconnecting ...");
-    manager.disconnect();
+    participant.syncStop();
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
index a49d655..547e863 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
@@ -19,10 +19,8 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.ZkHelixTestManager;
 import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -30,24 +28,20 @@ import org.testng.annotations.Test;
 public class TestLiveInstanceBounce extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
   @Test
   public void testInstanceBounce() throws Exception {
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    StartCMResult controllerResult = _startCMResultMap.get(controllerName);
-    ZkHelixTestManager controller = controllerResult._manager;
-    int handlerSize = controller.getHandlers().size();
+    int handlerSize = _controller.getHandlers().size();
 
     for (int i = 0; i < 2; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
       // kill 2 participants
-      _startCMResultMap.get(instanceName)._manager.disconnect();
-      _startCMResultMap.get(instanceName)._thread.interrupt();
+      _participants[i].syncStop();
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
       // restart the participant
-      StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
-      _startCMResultMap.put(instanceName, result);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
       Thread.sleep(100);
     }
     Thread.sleep(4000);
@@ -61,11 +55,11 @@ public class TestLiveInstanceBounce extends ZkStandAloneCMTestBaseWithPropertySe
     // and we will remove current-state listener on expired session
     // so the number of callback handlers is unchanged
     for (int j = 0; j < 10; j++) {
-      if (controller.getHandlers().size() == (handlerSize)) {
+      if (_controller.getHandlers().size() == (handlerSize)) {
         break;
       }
       Thread.sleep(400);
     }
-    Assert.assertEquals(controller.getHandlers().size(), handlerSize);
+    Assert.assertEquals(_controller.getHandlers().size(), handlerSize);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
index c85f207..50a9a78 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKPropertyTransferServer.java
@@ -20,11 +20,10 @@ package org.apache.helix.manager.zk;
  */
 
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
 import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -35,34 +34,28 @@ public class TestZKPropertyTransferServer extends ZkStandAloneCMTestBaseWithProp
   @Test
   public void TestControllerChange() throws Exception {
     String controllerName = CONTROLLER_PREFIX + "_0";
-    _startCMResultMap.get(controllerName)._manager.disconnect();
+    _controller.syncStop();
 
     Thread.sleep(1000);
 
     // kill controller, participant should not know about the svc url
     for (int i = 0; i < NODE_NR; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
       HelixDataAccessor accessor =
-          _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
+          _participants[i].getHelixDataAccessor();
       ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
       Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl == null
           || zkAccessor._zkPropertyTransferSvcUrl.equals(""));
     }
-    _startCMResultMap.get(controllerName)._thread.interrupt();
-    _startCMResultMap.remove(controllerName);
 
-    StartCMResult startResult =
-        TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
-            HelixControllerMain.STANDALONE);
-    _startCMResultMap.put(controllerName, startResult);
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
 
     Thread.sleep(1000);
 
     // create controller again, the svc url is notified to the participants
     for (int i = 0; i < NODE_NR; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
       HelixDataAccessor accessor =
-          _startCMResultMap.get(instanceName)._manager.getHelixDataAccessor();
+          _participants[i].getHelixDataAccessor();
       ZKHelixDataAccessor zkAccessor = (ZKHelixDataAccessor) accessor;
       Assert.assertTrue(zkAccessor._zkPropertyTransferSvcUrl.equals(ZKPropertyTransferServer
           .getInstance().getWebserviceUrl()));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index c099232..83dc986 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -27,6 +27,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.AccessOption;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ConfigScope;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -35,14 +41,10 @@ import org.apache.helix.LiveInstanceInfoProvider;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.ZkHelixTestManager;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.manager.MockListener;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
@@ -65,6 +67,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
 
     ZKHelixManager controller =
         new ZKHelixManager(clusterName, null, InstanceType.CONTROLLER, ZK_ADDR);
+
     try {
       controller.connect();
       Assert.fail("Should throw HelixException if initial cluster structure is not setup");
@@ -193,8 +196,9 @@ public class TestZkClusterManager extends ZkUnitTestBase {
 
     // //////////////////////////////////
 
-    ZkHelixTestManager manager2 =
-        new ZkHelixTestManager(clusterName, "localhost_3", InstanceType.PARTICIPANT, ZK_ADDR);
+    MockParticipantManager manager2 =
+        new MockParticipantManager(ZK_ADDR, clusterName, "localhost_3");
+
     manager2.setLiveInstanceInfoProvider(new provider(true));
 
     manager2.connect();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
index 5b35148..c329e9d 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
@@ -13,8 +13,8 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.TestHelper.Verifier;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.LiveInstance;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.testng.Assert;
@@ -134,10 +134,11 @@ public class TestZkFlapping extends ZkUnitTestBase {
         "MasterSlave", false);
 
     final String instanceName = "localhost_12918";
-    MockParticipant participant = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
     participant.syncStart();
 
-    final ZkClient client = participant.getManager().getZkClient();
+    final ZkClient client = participant.getZkClient();
     final ZkStateCountListener listener = new ZkStateCountListener();
     client.subscribeStateChanges(listener);
 
@@ -212,10 +213,11 @@ public class TestZkFlapping extends ZkUnitTestBase {
         1, // replicas
         "MasterSlave", false);
 
-    ClusterController controller = new ClusterController(clusterName, "controller", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
     controller.syncStart();
 
-    final ZkClient client = controller.getManager().getZkClient();
+    final ZkClient client = controller.getZkClient();
     final ZkStateCountListener listener = new ZkStateCountListener();
     client.subscribeStateChanges(listener);
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
index 249fcea..a62e39d 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkManagerFlappingDetection.java
@@ -19,13 +19,12 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import java.util.UUID;
 
-import org.apache.helix.InstanceType;
 import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -46,8 +45,8 @@ public class TestZkManagerFlappingDetection extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     String instanceName = "localhost_" + (12918 + 0);
-    ZkHelixTestManager manager =
-        new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
+    MockParticipantManager manager = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+
     manager.connect();
     ZkClient zkClient = manager.getZkClient();
     ZkTestHelper.expireSession(zkClient);
@@ -69,58 +68,59 @@ public class TestZkManagerFlappingDetection extends ZkIntegrationTestBase {
     Assert.assertFalse(manager.isConnected());
   }
 
-  @Test(enabled = false)
-  public void testDisconnectFlappingWindow() throws Exception {
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String instanceName = "localhost_" + (12918 + 1);
-    final String clusterName = className + "_" + methodName + UUID.randomUUID();
-
-    testDisconnectFlappingWindow2(instanceName, InstanceType.PARTICIPANT);
-    testDisconnectFlappingWindow2("admin", InstanceType.ADMINISTRATOR);
-  }
-
-  public void testDisconnectFlappingWindow2(String instanceName, InstanceType type)
-      throws Exception {
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    final String clusterName = className + "_" + methodName + UUID.randomUUID();
-
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-        "localhost", // participant name prefix
-        "TestDB", // resource name prefix
-        1, // resources
-        10, // partitions per resource
-        5, // number of nodes
-        3, // replicas
-        "MasterSlave", true); // do rebalance
-
-    // flapping time window to 5 sec
-    System.setProperty("helixmanager.flappingTimeWindow", "15000");
-    System.setProperty("helixmanager.maxDisconnectThreshold", "7");
-    ZkHelixTestManager manager2 = new ZkHelixTestManager(clusterName, instanceName, type, ZK_ADDR);
-    manager2.connect();
-    ZkClient zkClient = manager2.getZkClient();
-    for (int i = 0; i < 3; i++) {
-      ZkTestHelper.expireSession(zkClient);
-      Thread.sleep(500);
-      Assert.assertTrue(manager2.isConnected());
-    }
-    Thread.sleep(15000);
-    // Old entries should be cleaned up
-    for (int i = 0; i < 7; i++) {
-      ZkTestHelper.expireSession(zkClient);
-      Thread.sleep(1000);
-      Assert.assertTrue(manager2.isConnected());
-    }
-    ZkTestHelper.disconnectSession(zkClient);
-    for (int i = 0; i < 20; i++) {
-      Thread.sleep(500);
-      if (!manager2.isConnected())
-        break;
-    }
-    Assert.assertFalse(manager2.isConnected());
-  }
+  // TODO test was disabled. check if it is still needed
+  // @Test(enabled = false)
+  // public void testDisconnectFlappingWindow() throws Exception {
+  // String className = TestHelper.getTestClassName();
+  // String methodName = TestHelper.getTestMethodName();
+  // String instanceName = "localhost_" + (12918 + 1);
+  // final String clusterName = className + "_" + methodName + UUID.randomUUID();
+  //
+  // testDisconnectFlappingWindow2(instanceName, InstanceType.PARTICIPANT);
+  // testDisconnectFlappingWindow2("admin", InstanceType.ADMINISTRATOR);
+  // }
+  //
+  // public void testDisconnectFlappingWindow2(String instanceName, InstanceType type)
+  // throws Exception {
+  // String className = TestHelper.getTestClassName();
+  // String methodName = TestHelper.getTestMethodName();
+  // final String clusterName = className + "_" + methodName + UUID.randomUUID();
+  //
+  // TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+  // "localhost", // participant name prefix
+  // "TestDB", // resource name prefix
+  // 1, // resources
+  // 10, // partitions per resource
+  // 5, // number of nodes
+  // 3, // replicas
+  // "MasterSlave", true); // do rebalance
+  //
+  // // flapping time window to 5 sec
+  // System.setProperty("helixmanager.flappingTimeWindow", "15000");
+  // System.setProperty("helixmanager.maxDisconnectThreshold", "7");
+  // ZkHelixTestManager manager2 = new ZkHelixTestManager(clusterName, instanceName, type, ZK_ADDR);
+  // manager2.connect();
+  // ZkClient zkClient = manager2.getZkClient();
+  // for (int i = 0; i < 3; i++) {
+  // ZkTestHelper.expireSession(zkClient);
+  // Thread.sleep(500);
+  // Assert.assertTrue(manager2.isConnected());
+  // }
+  // Thread.sleep(15000);
+  // // Old entries should be cleaned up
+  // for (int i = 0; i < 7; i++) {
+  // ZkTestHelper.expireSession(zkClient);
+  // Thread.sleep(1000);
+  // Assert.assertTrue(manager2.isConnected());
+  // }
+  // ZkTestHelper.disconnectSession(zkClient);
+  // for (int i = 0; i < 20; i++) {
+  // Thread.sleep(500);
+  // if (!manager2.isConnected())
+  // break;
+  // }
+  // Assert.assertFalse(manager2.isConnected());
+  // }
 
   // @Test
   public void testDisconnectFlappingWindowController() throws Exception {
@@ -140,8 +140,7 @@ public class TestZkManagerFlappingDetection extends ZkIntegrationTestBase {
     // flapping time window to 5 sec
     System.setProperty("helixmanager.flappingTimeWindow", "5000");
     System.setProperty("helixmanager.maxDisconnectThreshold", "3");
-    ZkHelixTestManager manager2 =
-        new ZkHelixTestManager(clusterName, null, InstanceType.CONTROLLER, ZK_ADDR);
+    ClusterControllerManager manager2 = new ClusterControllerManager(ZK_ADDR, clusterName, null);
     manager2.connect();
     Thread.sleep(100);
     ZkClient zkClient = manager2.getZkClient();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
index 4f15f90..aeb32f9 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkStateChangeListener.java
@@ -19,68 +19,70 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestZkStateChangeListener extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
-  @Test
+  // TODO this test has been covered by TestZkFlapping. check if still needed
+  // @Test
   public void testDisconnectHistory() throws Exception {
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    StartCMResult controllerResult = _startCMResultMap.get(controllerName);
-    ZKHelixManager controller = controllerResult._manager;
-    ZkStateChangeListener listener1 = new ZkStateChangeListener(controller, 5000, 10);
+    // String controllerName = CONTROLLER_PREFIX + "_0";
+    // StartCMResult controllerResult = _startCMResultMap.get(controllerName);
+    // ZKHelixManager controller = (ZKHelixManager) controllerResult._manager;
+    // ZkStateChangeListener listener1 = new ZkStateChangeListener(controller, 5000, 10);
+    // ZkStateChangeListener listener1 = new ZkStateChangeListener(_controller, 5000, 10);
+
     // 11 disconnects in 5 sec
     for (int i = 0; i < 11; i++) {
       Thread.sleep(200);
-      listener1.handleStateChanged(KeeperState.Disconnected);
+      _controller.handleStateChanged(KeeperState.Disconnected);
       if (i < 10) {
-        Assert.assertTrue(controller.isConnected());
+        Assert.assertTrue(_controller.isConnected());
       } else {
-        Assert.assertFalse(controller.isConnected());
+        Assert.assertFalse(_controller.isConnected());
       }
     }
 
     // If maxDisconnectThreshold is 0 it should be set to 1
-    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    ZKHelixManager manager = _startCMResultMap.get(instanceName)._manager;
+    // String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+    // ZKHelixManager manager = (ZKHelixManager) _startCMResultMap.get(instanceName)._manager;
 
-    ZkStateChangeListener listener2 = new ZkStateChangeListener(manager, 5000, 0);
+    // ZkStateChangeListener listener2 = new ZkStateChangeListener(_participants[0], 5000, 0);
     for (int i = 0; i < 2; i++) {
       Thread.sleep(200);
-      listener2.handleStateChanged(KeeperState.Disconnected);
+      _participants[0].handleStateChanged(KeeperState.Disconnected);
       if (i < 1) {
-        Assert.assertTrue(manager.isConnected());
+        Assert.assertTrue(_participants[0].isConnected());
       } else {
-        Assert.assertFalse(manager.isConnected());
+        Assert.assertFalse(_participants[0].isConnected());
       }
     }
 
     // If there are long time after disconnect, older history should be cleanup
-    instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1);
-    manager = _startCMResultMap.get(instanceName)._manager;
+    // instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1);
+    // manager = (ZKHelixManager) _startCMResultMap.get(instanceName)._manager;
 
-    ZkStateChangeListener listener3 = new ZkStateChangeListener(manager, 5000, 5);
+    // ZkStateChangeListener listener3 = new ZkStateChangeListener(_participants[1], 5000, 5);
     for (int i = 0; i < 3; i++) {
       Thread.sleep(200);
-      listener3.handleStateChanged(KeeperState.Disconnected);
-      Assert.assertTrue(manager.isConnected());
+      _participants[1].handleStateChanged(KeeperState.Disconnected);
+      Assert.assertTrue(_participants[1].isConnected());
     }
     Thread.sleep(5000);
     // Old entries should be cleaned up
     for (int i = 0; i < 3; i++) {
       Thread.sleep(200);
-      listener3.handleStateChanged(KeeperState.Disconnected);
-      Assert.assertTrue(manager.isConnected());
+      _participants[1].handleStateChanged(KeeperState.Disconnected);
+      Assert.assertTrue(_participants[1].isConnected());
     }
     for (int i = 0; i < 2; i++) {
       Thread.sleep(200);
-      listener3.handleStateChanged(KeeperState.Disconnected);
-      Assert.assertTrue(manager.isConnected());
+      _participants[1].handleStateChanged(KeeperState.Disconnected);
+      Assert.assertTrue(_participants[1].isConnected());
     }
-    listener3.handleStateChanged(KeeperState.Disconnected);
-    Assert.assertFalse(manager.isConnected());
+    _participants[1].handleStateChanged(KeeperState.Disconnected);
+    Assert.assertFalse(_participants[1].isConnected());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
index f4566a0..c71a782 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
@@ -77,7 +77,7 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
   @Test
   public void TestThreadPoolSizeConfig() {
     String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+    HelixManager manager = _participants[0];
 
     ConfigAccessor accessor = manager.getConfigAccessor();
     ConfigScope scope =
@@ -91,9 +91,9 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
     for (int i = 0; i < NODE_NR; i++) {
       instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
 
-      _startCMResultMap.get(instanceName)._manager.getMessagingService()
-          .registerMessageHandlerFactory("TestMsg", new TestMessagingHandlerFactory());
-      _startCMResultMap.get(instanceName)._manager.getMessagingService()
+      _participants[i].getMessagingService().registerMessageHandlerFactory("TestMsg",
+          new TestMessagingHandlerFactory());
+      _participants[i].getMessagingService()
           .registerMessageHandlerFactory("TestMsg2", new TestMessagingHandlerFactory2());
 
     }
@@ -102,7 +102,7 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
       instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
 
       DefaultMessagingService svc =
-          (DefaultMessagingService) (_startCMResultMap.get(instanceName)._manager
+          (DefaultMessagingService) (_participants[i]
               .getMessagingService());
       HelixTaskExecutor helixExecutor = svc.getExecutor();
       ThreadPoolExecutor executor =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index 9104866..a5777ab 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -35,8 +35,7 @@ import org.testng.annotations.Test;
 public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
   @Test
   public void TestThreadPoolSizeConfig() {
-    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+    HelixManager manager = _participants[0];
     ConfigAccessor accessor = manager.getConfigAccessor();
     ConfigScope scope =
         new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource("NextDB").build();
@@ -52,11 +51,8 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
 
     long taskcount = 0;
     for (int i = 0; i < NODE_NR; i++) {
-      instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-
       DefaultMessagingService svc =
-          (DefaultMessagingService) (_startCMResultMap.get(instanceName)._manager
-              .getMessagingService());
+          (DefaultMessagingService) (_participants[i].getMessagingService());
       HelixTaskExecutor helixExecutor = svc.getExecutor();
       ThreadPoolExecutor executor =
           (ThreadPoolExecutor) (helixExecutor._executorMap.get(MessageType.STATE_TRANSITION + "."

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java b/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
deleted file mode 100644
index a04a213..0000000
--- a/helix-core/src/test/java/org/apache/helix/mock/controller/ClusterController.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.apache.helix.mock.controller;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.InstanceType;
-import org.apache.helix.ZkHelixTestManager;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.participant.DistClusterControllerStateModelFactory;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-
-public class ClusterController extends Thread {
-  private static Logger LOG = Logger.getLogger(ClusterController.class);
-
-  private final CountDownLatch _startCountDown = new CountDownLatch(1);
-  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
-  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-  private final String _controllerMode;
-  private final String _zkAddr;
-
-  private ZkHelixTestManager _manager;
-
-  public ClusterController(String clusterName, String controllerName, String zkAddr)
-      throws Exception {
-    this(clusterName, controllerName, zkAddr, HelixControllerMain.STANDALONE.toString());
-  }
-
-  public ClusterController(String clusterName, String controllerName, String zkAddr,
-      String controllerMode) throws Exception {
-    _controllerMode = controllerMode;
-    _zkAddr = zkAddr;
-
-    if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString())) {
-      _manager =
-          new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
-    } else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString())) {
-      _manager =
-          new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT,
-              zkAddr);
-    } else {
-      throw new IllegalArgumentException("Controller mode: " + controllerMode + " NOT recoginized");
-    }
-  }
-
-  public ZkHelixTestManager getManager() {
-    return _manager;
-  }
-
-  public void syncStop() {
-    if (_manager == null) {
-      LOG.warn("manager already stopped");
-      return;
-    }
-
-    _stopCountDown.countDown();
-    try {
-      _waitStopFinishCountDown.await();
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-  }
-
-  public void syncStart() {
-    // TODO: prevent start multiple times
-
-    super.start();
-    try {
-      _startCountDown.await();
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-  }
-
-  @Override
-  public void run() {
-    try {
-      try {
-        if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString())) {
-          _manager.connect();
-        } else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString())) {
-          DistClusterControllerStateModelFactory stateModelFactory =
-              new DistClusterControllerStateModelFactory(_zkAddr);
-
-          StateMachineEngine stateMach = _manager.getStateMachineEngine();
-          stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
-          _manager.connect();
-        }
-      } catch (Exception e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      } finally {
-        _startCountDown.countDown();
-        _stopCountDown.await();
-      }
-    } catch (Exception e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } finally {
-      synchronized (_manager) {
-        _manager.disconnect();
-        _manager = null;
-      }
-      _waitStopFinishCountDown.countDown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
index 59d9a0a..31811bb 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockHealthReportParticipant.java
@@ -33,6 +33,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.healthcheck.HealthReportProvider;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.log4j.Logger;
 
 public class MockHealthReportParticipant {
@@ -209,9 +210,9 @@ public class MockHealthReportParticipant {
 
   // NOT working for kill -9, working for kill -2/-15
   static class MockHealthReportParticipantShutdownHook extends Thread {
-    final MockParticipant _participant;
+    final MockParticipantManager _participant;
 
-    MockHealthReportParticipantShutdownHook(MockParticipant participant) {
+    MockHealthReportParticipantShutdownHook(MockParticipantManager participant) {
       _participant = participant;
     }
 
@@ -231,12 +232,11 @@ public class MockHealthReportParticipant {
 
     String instanceName = hostStr + "_" + portStr;
 
-    MockParticipant participant =
-        new MockParticipant(clusterName, instanceName, zkConnectStr, null, // new
-                                                                           // StoreAccessDiffNodeTransition(),
-                                                                           // // new
-                                                                           // StoreAccessOneNodeTransition(),
-            new MockHealthReportJob());
+    MockParticipantManager participant =
+        new MockParticipantManager(zkConnectStr, clusterName, instanceName);
+    // participant.setTransition(new StoreAccessDiffNodeTransition());
+    // participant.setTransition(new StoreAccessOneNodeTransition()));
+    // new MockHealthReportJob());
     Runtime.getRuntime().addShutdownHook(new MockHealthReportParticipantShutdownHook(participant));
 
     // Espresso_driver.py will consume this

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
deleted file mode 100644
index 4030b99..0000000
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockParticipant.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package org.apache.helix.mock.participant;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.InstanceType;
-import org.apache.helix.ZkHelixTestManager;
-import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
-import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.log4j.Logger;
-
-public class MockParticipant extends Thread {
-  private static Logger LOG = Logger.getLogger(MockParticipant.class);
-  private final String _clusterName;
-  private final String _instanceName;
-  // private final String _zkAddr;
-
-  private final CountDownLatch _startCountDown = new CountDownLatch(1);
-  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
-  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-
-  private final ZkHelixTestManager _manager;
-  private final StateModelFactory _msModelFactory;
-  private final MockJobIntf _job;
-
-  public MockParticipant(String clusterName, String instanceName, String zkAddr) throws Exception {
-    this(clusterName, instanceName, zkAddr, null, null);
-  }
-
-  public MockParticipant(String clusterName, String instanceName, String zkAddr,
-      MockTransition transition) throws Exception {
-    this(clusterName, instanceName, zkAddr, transition, null);
-  }
-
-  public MockParticipant(String clusterName, String instanceName, String zkAddr,
-      MockTransition transition, MockJobIntf job) throws Exception {
-    _clusterName = clusterName;
-    _instanceName = instanceName;
-    _msModelFactory = new MockMSModelFactory(transition);
-
-    _manager =
-        new ZkHelixTestManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, zkAddr);
-    _job = job;
-  }
-
-  public MockParticipant(StateModelFactory factory, String clusterName, String instanceName,
-      String zkAddr, MockJobIntf job) throws Exception {
-    _clusterName = clusterName;
-    _instanceName = instanceName;
-    _msModelFactory = factory;
-
-    _manager =
-        new ZkHelixTestManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, zkAddr);
-    _job = job;
-  }
-
-  public StateModelFactory getStateModelFactory() {
-    return _msModelFactory;
-  }
-
-  public MockParticipant(ZkHelixTestManager manager, MockTransition transition) {
-    _clusterName = manager.getClusterName();
-    _instanceName = manager.getInstanceName();
-    _manager = manager;
-
-    _msModelFactory = new MockMSModelFactory(transition);
-    _job = null;
-  }
-
-  public void setTransition(MockTransition transition) {
-    if (_msModelFactory instanceof MockMSModelFactory) {
-      ((MockMSModelFactory) _msModelFactory).setTrasition(transition);
-    }
-  }
-
-  public ZkHelixTestManager getManager() {
-    return _manager;
-  }
-
-  public String getInstanceName() {
-    return _instanceName;
-  }
-
-  public String getClusterName() {
-    return _clusterName;
-  }
-
-  public void syncStop() {
-    _stopCountDown.countDown();
-    try {
-      _waitStopFinishCountDown.await();
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-
-    // synchronized (_manager)
-    // {
-    // _manager.disconnect();
-    // }
-  }
-
-  public void syncStart() {
-    super.start();
-    try {
-      _startCountDown.await();
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-  }
-
-  @Override
-  public void run() {
-    try {
-      StateMachineEngine stateMach = _manager.getStateMachineEngine();
-      stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
-
-      DummyLeaderStandbyStateModelFactory lsModelFactory =
-          new DummyLeaderStandbyStateModelFactory(10);
-      DummyOnlineOfflineStateModelFactory ofModelFactory =
-          new DummyOnlineOfflineStateModelFactory(10);
-      stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
-      stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
-
-      MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
-      stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
-      // MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
-      // stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
-
-      if (_job != null) {
-        _job.doPreConnectJob(_manager);
-      }
-
-      _manager.connect();
-      _startCountDown.countDown();
-
-      if (_job != null) {
-        _job.doPostConnectJob(_manager);
-      }
-
-      _stopCountDown.await();
-    } catch (InterruptedException e) {
-      String msg =
-          "participant: " + _instanceName + ", " + Thread.currentThread().getName()
-              + " is interrupted";
-      LOG.info(msg);
-      System.err.println(msg);
-    } catch (Exception e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } finally {
-      _startCountDown.countDown();
-
-      synchronized (_manager) {
-        _manager.disconnect();
-      }
-      _waitStopFinishCountDown.countDown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 6de77b2..d97b22a 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -261,4 +261,10 @@ public class MockZKHelixManager implements HelixManager {
     return null;
   }
 
+  @Override
+  public void addControllerMessageListener(MessageListener listener) {
+    // TODO Auto-generated method stub
+
+  }
+
 }