You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/10/25 03:21:16 UTC

[03/10] [HELIX-279] Apply gc handling fixes to main ZKHelixManager class

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
index 2354ebd..457b5fb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -87,8 +87,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     String hostDest = "localhost_" + (START_PORT + 1);
 
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-    _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-        factory.getMessageType(), factory);
+    _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+        factory);
 
     String msgId = new UUID(123, 456).toString();
     Message msg = new Message(factory.getMessageType(), msgId);
@@ -104,7 +104,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
     cr.setSessionSpecific(false);
 
-    int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+    // int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+    int nMsgs = _participants[0].getMessagingService().send(cr, msg);
     AssertJUnit.assertTrue(nMsgs == 1);
     Thread.sleep(2500);
     // Thread.currentThread().join();
@@ -116,7 +117,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     cr.setSessionSpecific(false);
     cr.setDataSource(DataSource.IDEALSTATES);
 
-    nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+    // nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+    nMsgs = _participants[0].getMessagingService().send(cr, msg);
     AssertJUnit.assertTrue(nMsgs == 1);
     Thread.sleep(2500);
     // Thread.currentThread().join();
@@ -179,11 +181,11 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     String hostDest = "localhost_" + (START_PORT + 1);
 
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-    _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-        factory.getMessageType(), factory);
+    _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+        factory);
 
-    _startCMResultMap.get(hostSrc)._manager.getMessagingService().registerMessageHandlerFactory(
-        factory.getMessageType(), factory);
+    _participants[0].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+        factory);
 
     String msgId = new UUID(123, 456).toString();
     Message msg = new Message(factory.getMessageType(), msgId);
@@ -202,7 +204,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
 
     TestAsyncCallback callback = new TestAsyncCallback(60000);
 
-    _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
+    _participants[0].getMessagingService().send(cr, msg, callback, 60000);
 
     Thread.sleep(2000);
     // Thread.currentThread().join();
@@ -210,7 +212,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
 
     TestAsyncCallback callback2 = new TestAsyncCallback(500);
-    _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
+    _participants[0].getMessagingService().send(cr, msg, callback2, 500);
 
     Thread.sleep(3000);
     // Thread.currentThread().join();
@@ -224,7 +226,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
 
     callback = new TestAsyncCallback(60000);
 
-    _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
+    _participants[0].getMessagingService().send(cr, msg, callback, 60000);
 
     Thread.sleep(2000);
     // Thread.currentThread().join();
@@ -232,7 +234,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
 
     callback2 = new TestAsyncCallback(500);
-    _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
+    _participants[0].getMessagingService().send(cr, msg, callback2, 500);
 
     Thread.sleep(3000);
     // Thread.currentThread().join();
@@ -246,8 +248,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     String hostDest = "localhost_" + (START_PORT + 1);
 
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-    _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-        factory.getMessageType(), factory);
+    _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+        factory);
 
     String msgId = new UUID(123, 456).toString();
     Message msg = new Message(factory.getMessageType(), msgId);
@@ -266,8 +268,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
 
     AsyncCallback asyncCallback = new MockAsyncCallback();
     int messagesSent =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            asyncCallback, 60000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback, 60000);
 
     AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -275,9 +276,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1);
 
     AsyncCallback asyncCallback2 = new MockAsyncCallback();
-    messagesSent =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            asyncCallback2, 500);
+    messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback2, 500);
     AssertJUnit.assertTrue(asyncCallback2.isTimedOut());
 
   }
@@ -289,8 +288,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     for (int i = 0; i < NODE_NR; i++) {
       TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
       String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[0].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
+
     }
     String msgId = new UUID(123, 456).toString();
     Message msg = new Message(new TestMessagingHandlerFactory().getMessageType(), msgId);
@@ -308,8 +308,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     cr.setSessionSpecific(false);
     AsyncCallback callback1 = new MockAsyncCallback();
     int messageSent1 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback1, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
 
     AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -317,37 +316,32 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR - 1);
 
     AsyncCallback callback2 = new MockAsyncCallback();
-    int messageSent2 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback2, 500);
+    int messageSent2 = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 500);
+
     AssertJUnit.assertTrue(callback2.isTimedOut());
 
     cr.setPartition("TestDB_17");
     AsyncCallback callback3 = new MockAsyncCallback();
     int messageSent3 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback3, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
     AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica - 1);
 
     cr.setPartition("TestDB_15");
     AsyncCallback callback4 = new MockAsyncCallback();
     int messageSent4 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback4, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback4, 10000);
     AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica);
 
     cr.setPartitionState("SLAVE");
     AsyncCallback callback5 = new MockAsyncCallback();
     int messageSent5 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback5, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback5, 10000);
     AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 1);
 
     cr.setDataSource(DataSource.IDEALSTATES);
     AsyncCallback callback6 = new MockAsyncCallback();
     int messageSent6 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback6, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback6, 10000);
     AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 1);
   }
 
@@ -358,8 +352,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     for (int i = 0; i < NODE_NR; i++) {
       TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
       String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
+
     }
     String msgId = new UUID(123, 456).toString();
     Message msg = new Message(new TestMessagingHandlerFactory().getMessageType(), msgId);
@@ -378,8 +373,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     cr.setSelfExcluded(false);
     AsyncCallback callback1 = new MockAsyncCallback();
     int messageSent1 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback1, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
 
     AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR);
     AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
@@ -394,8 +388,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     for (int i = 0; i < NODE_NR; i++) {
       TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
       String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
+
     }
     String msgId = new UUID(123, 456).toString();
     Message msg = new Message(MessageType.CONTROLLER_MSG, msgId);
@@ -414,8 +409,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
 
     AsyncCallback callback1 = new MockAsyncCallback();
     int messagesSent =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback1, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
 
     AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
@@ -426,9 +420,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     msg.setMsgId(msgId);
     cr.setPartition("TestDB_17");
     AsyncCallback callback2 = new MockAsyncCallback();
-    messagesSent =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback2, 10000);
+    messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 10000);
+
     AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
         .indexOf(hostSrc) != -1);
@@ -439,9 +432,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     msg.setMsgId(msgId);
     cr.setPartitionState("SLAVE");
     AsyncCallback callback3 = new MockAsyncCallback();
-    messagesSent =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback3, 10000);
+    messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
     AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
         .indexOf(hostSrc) != -1);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
index 5a6e5e6..734e2b4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
@@ -22,9 +22,9 @@ package org.apache.helix.integration;
 import java.util.Date;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockBootstrapModelFactory;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.tools.ClusterSetup;
@@ -51,18 +51,19 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
         1, // replicas
         "Bootstrap", true); // do rebalance
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
 
       // add a state model with non-OFFLINE initial state
-      StateMachineEngine stateMach = participants[i].getManager().getStateMachineEngine();
+      StateMachineEngine stateMach = participants[i].getStateMachineEngine();
       MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
       stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
 
@@ -74,16 +75,21 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
             clusterName));
     Assert.assertTrue(result);
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
+
     System.out.println("END testNonOfflineInitState at " + new Date(System.currentTimeMillis()));
   }
 
   private static void setupCluster(String clusterName, String ZkAddr, int startPort,
       String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb,
       int nodesNb, int replica, String stateModelDef, boolean doRebalance) throws Exception {
-    ZkClient zkClient = new ZkClient(ZkAddr);
-    if (zkClient.exists("/" + clusterName)) {
+    if (_gZkClient.exists("/" + clusterName)) {
       LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
-      zkClient.deleteRecursive("/" + clusterName);
+      _gZkClient.deleteRecursive("/" + clusterName);
     }
 
     ClusterSetup setupTool = new ClusterSetup(ZkAddr);
@@ -103,7 +109,6 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
         setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
       }
     }
-    zkClient.close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
index 496d1a6..0f5cc72 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
@@ -25,8 +25,8 @@ import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -44,7 +44,7 @@ public class TestNullReplica extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -61,14 +61,15 @@ public class TestNullReplica extends ZkIntegrationTestBase {
     idealState.getSimpleFields().remove(IdealState.IdealStateProperty.REPLICAS.toString());
     _gZkClient.writeData(idealStatePath, idealState);
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -78,13 +79,11 @@ public class TestNullReplica extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }
 
-    Thread.sleep(2000);
-    controller.syncStop();
-
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
index 8336939..cf59ed0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
@@ -31,10 +31,13 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
+  private static Logger LOG = Logger.getLogger(TestParticipantErrorMessage.class);
+
   @Test()
   public void TestParticipantErrorMessageSend() {
     String participant1 = "localhost_" + START_PORT;
@@ -49,7 +52,7 @@ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
     Criteria recipientCriteria = new Criteria();
     recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
     recipientCriteria.setSessionSpecific(false);
-    _startCMResultMap.get(participant1)._manager.getMessagingService().send(recipientCriteria,
+    _participants[0].getMessagingService().send(recipientCriteria,
         errorMessage1);
 
     Message errorMessage2 =
@@ -63,23 +66,22 @@ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
     Criteria recipientCriteria2 = new Criteria();
     recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
     recipientCriteria2.setSessionSpecific(false);
-    _startCMResultMap.get(participant2)._manager.getMessagingService().send(recipientCriteria2,
+    _participants[1].getMessagingService().send(recipientCriteria2,
         errorMessage2);
 
     try {
       Thread.sleep(1500);
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Interrupted sleep", e);
     }
 
     boolean result =
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
             CLUSTER_NAME));
     Assert.assertTrue(result);
-    Builder kb = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().keyBuilder();
+    Builder kb = _participants[1].getHelixDataAccessor().keyBuilder();
     ExternalView externalView =
-        _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().getProperty(
+        _participants[1].getHelixDataAccessor().getProperty(
             kb.externalView("TestDB"));
 
     for (String partitionName : externalView.getRecord().getMapFields().keySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
index 4227688..aa67ac9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
@@ -22,7 +22,7 @@ package org.apache.helix.integration;
 import java.util.Date;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.log4j.Logger;
 import org.testng.annotations.Test;
 
@@ -33,20 +33,21 @@ public class TestParticipantNameCollision extends ZkStandAloneCMTestBase {
   public void testParticiptantNameCollision() throws Exception {
     logger.info("RUN TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
 
-    StartCMResult result = null;
+    MockParticipantManager newParticipant = null;
     for (int i = 0; i < 1; i++) {
       String instanceName = "localhost_" + (START_PORT + i);
       try {
         // the call fails on getClusterManagerForParticipant()
         // no threads start
-        result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+        newParticipant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+        newParticipant.syncStart();
       } catch (Exception e) {
         e.printStackTrace();
       }
     }
 
     Thread.sleep(30000);
-    TestHelper.verifyWithTimeout("verifyNotConnected", 30 * 1000, result._manager);
+    TestHelper.verifyWithTimeout("verifyNotConnected", 30 * 1000, newParticipant);
 
     logger.info("STOP TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
index d900d98..a1c413b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
@@ -24,12 +24,12 @@ import java.util.Date;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -47,7 +47,7 @@ public class TestPauseSignal extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -59,14 +59,15 @@ public class TestPauseSignal extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -112,13 +113,11 @@ public class TestPauseSignal extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }
 
-    Thread.sleep(2000);
-    controller.syncStop();
-
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
index ed056ab..5f4377d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
@@ -23,14 +23,15 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -39,6 +40,14 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestRenamePartition extends ZkIntegrationTestBase {
+  // map from clusterName to participants
+  final Map<String, MockParticipantManager[]> _participantMap =
+      new ConcurrentHashMap<String, MockParticipantManager[]>();
+
+  // map from clusterName to controllers
+  final Map<String, ClusterControllerManager> _controllerMap =
+      new ConcurrentHashMap<String, ClusterControllerManager>();
+
   @Test()
   public void testRenamePartitionAutoIS() throws Exception {
     String clusterName = "CLUSTER_" + getShortClassName() + "_auto";
@@ -57,7 +66,7 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
 
     // rename partition name TestDB0_0 tp TestDB0_100
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
@@ -71,8 +80,8 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
             ZK_ADDR, clusterName));
     Assert.assertTrue(result);
 
+    stop(clusterName);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
   }
 
   @Test()
@@ -103,7 +112,7 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
     idealState.setStateModelDefRef("MasterSlave");
 
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
@@ -118,23 +127,25 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
             ZK_ADDR, clusterName));
     Assert.assertTrue(result);
+
+    stop(clusterName);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
   }
 
   private void startAndVerify(String clusterName) throws Exception {
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
 
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     boolean result =
@@ -142,5 +153,21 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
             ZK_ADDR, clusterName));
     Assert.assertTrue(result);
 
+    _participantMap.put(clusterName, participants);
+    _controllerMap.put(clusterName, controller);
+  }
+
+  private void stop(String clusterName) {
+    ClusterControllerManager controller = _controllerMap.get(clusterName);
+    if (controller != null) {
+      controller.syncStop();
+    }
+
+    MockParticipantManager[] participants = _participantMap.get(clusterName);
+    if (participants != null) {
+      for (MockParticipantManager participant : participants) {
+        participant.syncStop();
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
index 2715932..7159a17 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
@@ -25,8 +25,8 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.ErrTransition;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -54,7 +54,8 @@ public class TestResetInstance extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -65,16 +66,15 @@ public class TestResetInstance extends ZkIntegrationTestBase {
     };
 
     // start mock participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
-        participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR,
-                new ErrTransition(errPartitions));
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -102,8 +102,6 @@ public class TestResetInstance extends ZkIntegrationTestBase {
     Assert.assertTrue(result, "Cluster verification fails");
 
     // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
index 09e57c6..c37a5ea 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
@@ -28,10 +28,10 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.ErrTransition;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
@@ -80,7 +80,8 @@ public class TestResetPartitionState extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -91,16 +92,16 @@ public class TestResetPartitionState extends ZkIntegrationTestBase {
     };
 
     // start mock participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
         participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR,
-                new ErrTransition(errPartitions));
+            new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -169,8 +170,6 @@ public class TestResetPartitionState extends ZkIntegrationTestBase {
     Assert.assertEquals(_errToOfflineInvoked, 2, "Should reset 2 partitions");
 
     // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
index de4ad1a..46a05d8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
@@ -25,8 +25,8 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.ErrTransition;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -53,7 +53,8 @@ public class TestResetResource extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -64,16 +65,15 @@ public class TestResetResource extends ZkIntegrationTestBase {
     };
 
     // start mock participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
-        participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR,
-                new ErrTransition(errPartitions));
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -101,8 +101,6 @@ public class TestResetResource extends ZkIntegrationTestBase {
     Assert.assertTrue(result, "Cluster verification fails");
 
     // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
index 008782c..64043ed 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
@@ -24,8 +24,8 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -35,15 +35,15 @@ import org.testng.annotations.Test;
 
 public class TestRestartParticipant extends ZkIntegrationTestBase {
   public class KillOtherTransition extends MockTransition {
-    final AtomicReference<MockParticipant> _other;
+    final AtomicReference<MockParticipantManager> _other;
 
-    public KillOtherTransition(MockParticipant other) {
-      _other = new AtomicReference<MockParticipant>(other);
+    public KillOtherTransition(MockParticipantManager other) {
+      _other = new AtomicReference<MockParticipantManager>(other);
     }
 
     @Override
     public void doTransition(Message message, NotificationContext context) {
-      MockParticipant other = _other.getAndSet(null);
+      MockParticipantManager other = _other.getAndSet(null);
       if (other != null) {
         System.err.println("Kill " + other.getInstanceName()
             + ". Interrupted exceptions are IGNORABLE");
@@ -58,7 +58,7 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
     System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis()));
 
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -69,19 +69,19 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
         3, // replicas
         "MasterSlave", true); // do rebalance
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 4) {
-        participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR, new KillOtherTransition(
-                participants[0]));
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new KillOtherTransition(participants[0]));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
-        // Thread.sleep(100);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
 
       participants[i].syncStart();
@@ -94,9 +94,9 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
 
     // restart
     Thread.sleep(500);
-    MockParticipant participant =
-        new MockParticipant(participants[0].getClusterName(), participants[0].getInstanceName(),
-            ZK_ADDR, null);
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, participants[0].getClusterName(),
+            participants[0].getInstanceName());
     System.err.println("Restart " + participant.getInstanceName());
     participant.syncStart();
     result =
@@ -104,6 +104,13 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
             clusterName));
     Assert.assertTrue(result);
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
+    participant.syncStop();
+
     System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis()));
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 2c174c4..bf851cc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -194,10 +194,10 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
 
     Message schedulerMessage =
@@ -282,10 +282,10 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
 
     Message schedulerMessage =
@@ -367,11 +367,11 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     String controllerStatusPath =
         HelixUtil.getControllerPropertyPath(manager.getClusterName(),
             PropertyType.STATUSUPDATES_CONTROLLER);
-    List<String> subPaths = _zkClient.getChildren(controllerStatusPath);
+    List<String> subPaths = _gZkClient.getChildren(controllerStatusPath);
     Assert.assertTrue(subPaths.size() > 0);
     for (String subPath : subPaths) {
       String nextPath = controllerStatusPath + "/" + subPath;
-      List<String> subsubPaths = _zkClient.getChildren(nextPath);
+      List<String> subsubPaths = _gZkClient.getChildren(nextPath);
       Assert.assertTrue(subsubPaths.size() > 0);
     }
 
@@ -379,38 +379,38 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
         HelixUtil.getInstancePropertyPath(manager.getClusterName(), "localhost_" + (START_PORT),
             PropertyType.STATUSUPDATES);
 
-    subPaths = _zkClient.getChildren(instanceStatusPath);
+    subPaths = _gZkClient.getChildren(instanceStatusPath);
     Assert.assertTrue(subPaths.size() > 0);
     for (String subPath : subPaths) {
       String nextPath = instanceStatusPath + "/" + subPath;
-      List<String> subsubPaths = _zkClient.getChildren(nextPath);
+      List<String> subsubPaths = _gZkClient.getChildren(nextPath);
       Assert.assertTrue(subsubPaths.size() > 0);
       for (String subsubPath : subsubPaths) {
         String nextnextPath = nextPath + "/" + subsubPath;
-        Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0);
+        Assert.assertTrue(_gZkClient.getChildren(nextnextPath).size() > 0);
       }
     }
     Thread.sleep(3000);
-    ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0);
+    ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _gZkClient, 0);
     dumpTask.run();
 
-    subPaths = _zkClient.getChildren(controllerStatusPath);
+    subPaths = _gZkClient.getChildren(controllerStatusPath);
     Assert.assertTrue(subPaths.size() > 0);
     for (String subPath : subPaths) {
       String nextPath = controllerStatusPath + "/" + subPath;
-      List<String> subsubPaths = _zkClient.getChildren(nextPath);
+      List<String> subsubPaths = _gZkClient.getChildren(nextPath);
       Assert.assertTrue(subsubPaths.size() == 0);
     }
 
-    subPaths = _zkClient.getChildren(instanceStatusPath);
+    subPaths = _gZkClient.getChildren(instanceStatusPath);
     Assert.assertTrue(subPaths.size() > 0);
     for (String subPath : subPaths) {
       String nextPath = instanceStatusPath + "/" + subPath;
-      List<String> subsubPaths = _zkClient.getChildren(nextPath);
+      List<String> subsubPaths = _gZkClient.getChildren(nextPath);
       Assert.assertTrue(subsubPaths.size() > 0);
       for (String subsubPath : subsubPaths) {
         String nextnextPath = nextPath + "/" + subsubPath;
-        Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() == 0);
+        Assert.assertTrue(_gZkClient.getChildren(nextnextPath).size() == 0);
       }
     }
   }
@@ -420,10 +420,10 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
 
     Message schedulerMessage =
@@ -511,10 +511,10 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
 
     Message schedulerMessage =
@@ -581,13 +581,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
-      //
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
 
     Message schedulerMessage =
@@ -702,13 +702,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
-      //
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
 
     Message schedulerMessage =
@@ -852,13 +852,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
-      //
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
 
     Message schedulerMessage =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
index 3024f45..a927520 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
@@ -27,10 +27,10 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -46,7 +46,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
     String clusterName = className + "_" + methodName;
     int n = 5;
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
@@ -70,14 +70,15 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
         Arrays.asList(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString()));
     accessor.setProperty(key, idealState);
 
-    ClusterController controller = new ClusterController(clusterName, "controller", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
     controller.syncStart();
 
     // start n-1 participants
     for (int i = 1; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -87,7 +88,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // start the remaining 1 participant
-    participants[0] = new MockParticipant(clusterName, "localhost_12918", ZK_ADDR, null);
+    participants[0] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
     participants[0].syncStart();
 
     // make sure we have all participants in MASTER state
@@ -107,6 +108,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
     }
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
index f38c6de..965b8ef 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
@@ -25,10 +25,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -39,13 +38,14 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
+  private static Logger LOG = Logger.getLogger(TestSessionExpiryInTransition.class);
 
   public class SessionExpiryTransition extends MockTransition {
     private final AtomicBoolean _done = new AtomicBoolean();
 
     @Override
     public void doTransition(Message message, NotificationContext context) {
-      ZkHelixTestManager manager = (ZkHelixTestManager) context.getManager();
+      MockParticipantManager manager = (MockParticipantManager) context.getManager();
 
       String instance = message.getTgtName();
       String partition = message.getPartitionName();
@@ -55,8 +55,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
         try {
           ZkTestHelper.expireSession(manager.getZkClient());
         } catch (Exception e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
+          LOG.error("Exception expire zk-session", e);
         }
       }
     }
@@ -64,7 +63,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
 
   @Test
   public void testSessionExpiryInTransition() throws Exception {
-    Logger.getRootLogger().setLevel(Level.WARN);
+    // Logger.getRootLogger().setLevel(Level.WARN);
 
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
@@ -72,7 +71,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -84,15 +83,15 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      ZkHelixTestManager manager =
-          new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
-      participants[i] = new MockParticipant(manager, new SessionExpiryTransition());
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new SessionExpiryTransition());
       participants[i].syncStart();
     }
 
@@ -102,13 +101,11 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }
 
-    Thread.sleep(2000);
-    controller.syncStop();
-
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
index 02a34d8..6eb7a8c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
@@ -21,9 +21,15 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.TestHelper.Verifier;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
@@ -35,16 +41,34 @@ public class TestStandAloneCMMain extends ZkStandAloneCMTestBase {
   @Test()
   public void testStandAloneCMMain() throws Exception {
     logger.info("RUN testStandAloneCMMain() at " + new Date(System.currentTimeMillis()));
-
+    ClusterControllerManager newController = null;
     for (int i = 1; i <= 2; i++) {
       String controllerName = "controller_" + i;
-      StartCMResult startResult =
-          TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
-              HelixControllerMain.STANDALONE);
-      _startCMResultMap.put(controllerName, startResult);
+      newController =
+          new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+      newController.syncStart();
     }
 
-    stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap);
+    // stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap);
+    _controller.syncStop();
+
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    final String newControllerName = newController.getInstanceName();
+    TestHelper.verify(new Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+        if (leader == null) {
+          return false;
+        }
+        return leader.getInstanceName().equals(newControllerName);
+
+      }
+    }, 30 * 1000);
+
     boolean result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
             ZK_ADDR, CLUSTER_NAME));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
index 347ff7e..e8adf03 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
@@ -23,9 +23,9 @@ import java.util.Date;
 
 import org.apache.helix.InstanceType;
 import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
@@ -47,18 +47,16 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase {
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, PARTICIPANT_PREFIX, "TestDB", 1, 20, 5, 3,
         "MasterSlave", true);
 
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      ZkHelixTestManager manager =
-          new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
-      participants[i] = new MockParticipant(manager, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
-    ZkHelixTestManager controller =
-        new ZkHelixTestManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
-    controller.connect();
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
 
     boolean result;
     result =
@@ -67,7 +65,7 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // participant session expiry
-    ZkHelixTestManager participantToExpire = (ZkHelixTestManager) participants[1].getManager();
+    MockParticipantManager participantToExpire = participants[1];
 
     System.out.println("Expire participant session");
     String oldSessionId = participantToExpire.getSessionId();
@@ -107,8 +105,7 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase {
     // clean up
     System.out.println("Clean up ...");
     // Logger.getRootLogger().setLevel(Level.DEBUG);
-    controller.disconnect();
-    Thread.sleep(100);
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
index dce3fd4..d191c18 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
@@ -25,7 +25,7 @@ import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -54,10 +54,10 @@ public class TestStartMultipleControllersWithSameName extends ZkIntegrationTestB
     // rebalance
 
     // start controller
-    ClusterController[] controllers = new ClusterController[4];
+    ClusterControllerManager[] controllers = new ClusterControllerManager[4];
     for (int i = 0; i < 4; i++) {
-      controllers[i] = new ClusterController(clusterName, "controller_0", ZK_ADDR);
-      controllers[i].start();
+      controllers[i] = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+      controllers[i].syncStart();
     }
 
     Thread.sleep(500); // wait leader election finishes
@@ -69,7 +69,6 @@ public class TestStartMultipleControllersWithSameName extends ZkIntegrationTestB
     // clean up
     for (int i = 0; i < 4; i++) {
       controllers[i].syncStop();
-      Thread.sleep(1000); // wait for all zk callbacks done
     }
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index edc10c6..a297752 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -28,22 +28,16 @@ import java.util.Set;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.NotificationContext;
-import org.apache.helix.TestHelper;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
-import org.apache.helix.mock.participant.MockJobIntf;
 import org.apache.helix.mock.participant.MockMSStateModel;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.mock.participant.SleepTransition;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.StateTransitionError;
@@ -59,15 +53,14 @@ import org.testng.annotations.Test;
 public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
   private static Logger LOG = Logger.getLogger(TestStateTransitionTimeout.class);
 
+  @Override
   @BeforeClass
   public void beforeClass() throws Exception {
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
     String namespace = "/" + CLUSTER_NAME;
-    if (_zkClient.exists(namespace)) {
-      _zkClient.deleteRecursive(namespace);
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
     }
     _setupTool = new ClusterSetup(ZK_ADDR);
 
@@ -106,12 +99,14 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
       _sleep = sleep;
     }
 
+    @Override
     @Transition(to = "SLAVE", from = "OFFLINE")
     public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
       LOG.info("Become SLAVE from OFFLINE");
 
     }
 
+    @Override
     @Transition(to = "MASTER", from = "SLAVE")
     public void onBecomeMasterFromSlave(Message message, NotificationContext context)
         throws InterruptedException {
@@ -121,23 +116,27 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
       }
     }
 
+    @Override
     @Transition(to = "SLAVE", from = "MASTER")
     public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
       LOG.info("Become SLAVE from MASTER");
     }
 
+    @Override
     @Transition(to = "OFFLINE", from = "SLAVE")
     public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
       LOG.info("Become OFFLINE from SLAVE");
 
     }
 
+    @Override
     @Transition(to = "DROPPED", from = "OFFLINE")
     public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
       LOG.info("Become DROPPED from OFFLINE");
 
     }
 
+    @Override
     public void rollbackOnError(Message message, NotificationContext context,
         StateTransitionError error) {
       _error = error;
@@ -171,7 +170,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
   @Test
   public void testStateTransitionTimeOut() throws Exception {
     Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
-    MockParticipant[] participants = new MockParticipant[NODE_NR];
+    // MockParticipantManager[] participants = new MockParticipantManager[NODE_NR];
     IdealState idealState =
         _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
     for (int i = 0; i < NODE_NR; i++) {
@@ -184,19 +183,20 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
         }
       }
 
-      participants[i] = new MockParticipant(factory, CLUSTER_NAME, instanceName, ZK_ADDR, null);
-      participants[i].syncStart();
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", factory);
+      _participants[i].syncStart();
     }
     String controllerName = CONTROLLER_PREFIX + "_0";
-    StartCMResult startResult =
-        TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
-            HelixControllerMain.STANDALONE);
-    _startCMResultMap.put(controllerName, startResult);
+    _controller =
+        new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
     boolean result =
         ClusterStateVerifier
             .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
-    HelixDataAccessor accessor = participants[0].getManager().getHelixDataAccessor();
+    HelixDataAccessor accessor = _participants[0].getHelixDataAccessor();
 
     Builder kb = accessor.keyBuilder();
     ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
index a1f63aa..6cce716 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
@@ -21,9 +21,8 @@ package org.apache.helix.integration;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -33,8 +32,7 @@ import org.testng.annotations.Test;
 public class TestSwapInstance extends ZkStandAloneCMTestBase {
   @Test
   public void TestSwap() throws Exception {
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixManager manager = _controller;
     HelixDataAccessor helixAccessor = manager.getHelixDataAccessor();
     _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 64, STATE_MODEL);
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", _replica);
@@ -49,7 +47,7 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
     idealStateOld2.merge(is2.getRecord());
 
     String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
 
     boolean result =
@@ -68,8 +66,7 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
     }
     Assert.assertTrue(exception);
 
-    _startCMResultMap.get(instanceName)._manager.disconnect();
-    _startCMResultMap.get(instanceName)._thread.interrupt();
+    _participants[0].syncStop();
     Thread.sleep(1000);
 
     exception = false;
@@ -80,8 +77,9 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
       exception = true;
     }
     Assert.assertFalse(exception);
-    StartCMResult result2 = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName2);
-    _startCMResultMap.put(instanceName2, result2);
+    MockParticipantManager newParticipant =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName2);
+    newParticipant.syncStart();
 
     result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(