You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/11/11 22:09:58 UTC

[04/10] [HELIX-279] Apply gc handling fixes to ZKHelixManager

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
index 8cbe55b..0c7c131 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle.java
@@ -29,13 +29,13 @@ import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.Message;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -49,8 +49,7 @@ public class TestMessageThrottle extends ZkIntegrationTestBase {
     // Logger.getRootLogger().setLevel(Level.INFO);
 
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
-    // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
@@ -114,13 +113,15 @@ public class TestMessageThrottle extends ZkIntegrationTestBase {
       });
     }
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -137,6 +138,7 @@ public class TestMessageThrottle extends ZkIntegrationTestBase {
     Assert.assertTrue(success.get());
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
index a947445..a182753 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessageThrottle2.java
@@ -32,7 +32,6 @@ import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.IdealStateChangeListener;
 import org.apache.helix.InstanceConfigChangeListener;
 import org.apache.helix.InstanceType;
@@ -246,8 +245,7 @@ public class TestMessageThrottle2 extends ZkIntegrationTestBase {
 
     public void start() throws Exception {
       helixManager =
-          HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
-              InstanceType.PARTICIPANT, ZK_ADDR);
+          new ZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
       {
         // hack to set sessionTimeout
         Field sessionTimeout = ZKHelixManager.class.getDeclaredField("_sessionTimeout");

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
index 9c6b4b7..86f1ce4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagingService.java
@@ -89,8 +89,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     String hostDest = "localhost_" + (START_PORT + 1);
 
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-    _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-        factory.getMessageType(), factory);
+    _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+        factory);
 
     MessageId msgId = MessageId.from(new UUID(123, 456).toString());
     Message msg = new Message(factory.getMessageType(), msgId);
@@ -106,7 +106,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
     cr.setSessionSpecific(false);
 
-    int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+    // int nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+    int nMsgs = _participants[0].getMessagingService().send(cr, msg);
     AssertJUnit.assertTrue(nMsgs == 1);
     Thread.sleep(2500);
     // Thread.currentThread().join();
@@ -118,7 +119,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     cr.setSessionSpecific(false);
     cr.setDataSource(DataSource.IDEALSTATES);
 
-    nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+    // nMsgs = _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg);
+    nMsgs = _participants[0].getMessagingService().send(cr, msg);
     AssertJUnit.assertTrue(nMsgs == 1);
     Thread.sleep(2500);
     // Thread.currentThread().join();
@@ -181,11 +183,11 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     String hostDest = "localhost_" + (START_PORT + 1);
 
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-    _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-        factory.getMessageType(), factory);
+    _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+        factory);
 
-    _startCMResultMap.get(hostSrc)._manager.getMessagingService().registerMessageHandlerFactory(
-        factory.getMessageType(), factory);
+    _participants[0].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+        factory);
 
     MessageId msgId = MessageId.from(new UUID(123, 456).toString());
     Message msg = new Message(factory.getMessageType(), msgId);
@@ -204,7 +206,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
 
     TestAsyncCallback callback = new TestAsyncCallback(60000);
 
-    _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
+    _participants[0].getMessagingService().send(cr, msg, callback, 60000);
 
     Thread.sleep(2000);
     // Thread.currentThread().join();
@@ -212,7 +214,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
 
     TestAsyncCallback callback2 = new TestAsyncCallback(500);
-    _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
+    _participants[0].getMessagingService().send(cr, msg, callback2, 500);
 
     Thread.sleep(3000);
     // Thread.currentThread().join();
@@ -226,7 +228,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
 
     callback = new TestAsyncCallback(60000);
 
-    _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback, 60000);
+    _participants[0].getMessagingService().send(cr, msg, callback, 60000);
 
     Thread.sleep(2000);
     // Thread.currentThread().join();
@@ -234,7 +236,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     AssertJUnit.assertTrue(callback.getMessageReplied().size() == 1);
 
     callback2 = new TestAsyncCallback(500);
-    _startCMResultMap.get(hostSrc)._manager.getMessagingService().send(cr, msg, callback2, 500);
+    _participants[0].getMessagingService().send(cr, msg, callback2, 500);
 
     Thread.sleep(3000);
     // Thread.currentThread().join();
@@ -248,8 +250,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     String hostDest = "localhost_" + (START_PORT + 1);
 
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-    _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-        factory.getMessageType(), factory);
+    _participants[1].getMessagingService().registerMessageHandlerFactory(factory.getMessageType(),
+        factory);
 
     MessageId msgId = MessageId.from(new UUID(123, 456).toString());
     Message msg = new Message(factory.getMessageType(), msgId);
@@ -268,8 +270,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
 
     AsyncCallback asyncCallback = new MockAsyncCallback();
     int messagesSent =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            asyncCallback, 60000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback, 60000);
 
     AssertJUnit.assertTrue(asyncCallback.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -277,9 +278,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     AssertJUnit.assertTrue(asyncCallback.getMessageReplied().size() == 1);
 
     AsyncCallback asyncCallback2 = new MockAsyncCallback();
-    messagesSent =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            asyncCallback2, 500);
+    messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, asyncCallback2, 500);
     AssertJUnit.assertTrue(asyncCallback2.isTimedOut());
 
   }
@@ -291,8 +290,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     for (int i = 0; i < NODE_NR; i++) {
       TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
       String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[0].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
+
     }
     MessageId msgId = MessageId.from(new UUID(123, 456).toString());
     Message msg = new Message(new TestMessagingHandlerFactory().getMessageType(), msgId);
@@ -310,8 +310,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     cr.setSessionSpecific(false);
     AsyncCallback callback1 = new MockAsyncCallback();
     int messageSent1 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback1, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
 
     AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ReplyMessage")
@@ -319,37 +318,32 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR - 1);
 
     AsyncCallback callback2 = new MockAsyncCallback();
-    int messageSent2 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback2, 500);
+    int messageSent2 = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 500);
+
     AssertJUnit.assertTrue(callback2.isTimedOut());
 
     cr.setPartition("TestDB_17");
     AsyncCallback callback3 = new MockAsyncCallback();
     int messageSent3 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback3, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
     AssertJUnit.assertTrue(callback3.getMessageReplied().size() == _replica - 1);
 
     cr.setPartition("TestDB_15");
     AsyncCallback callback4 = new MockAsyncCallback();
     int messageSent4 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback4, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback4, 10000);
     AssertJUnit.assertTrue(callback4.getMessageReplied().size() == _replica);
 
     cr.setPartitionState("SLAVE");
     AsyncCallback callback5 = new MockAsyncCallback();
     int messageSent5 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback5, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback5, 10000);
     AssertJUnit.assertTrue(callback5.getMessageReplied().size() == _replica - 1);
 
     cr.setDataSource(DataSource.IDEALSTATES);
     AsyncCallback callback6 = new MockAsyncCallback();
     int messageSent6 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback6, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback6, 10000);
     AssertJUnit.assertTrue(callback6.getMessageReplied().size() == _replica - 1);
   }
 
@@ -360,8 +354,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     for (int i = 0; i < NODE_NR; i++) {
       TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
       String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
+
     }
     MessageId msgId = MessageId.from(new UUID(123, 456).toString());
     Message msg = new Message(new TestMessagingHandlerFactory().getMessageType(), msgId);
@@ -380,8 +375,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     cr.setSelfExcluded(false);
     AsyncCallback callback1 = new MockAsyncCallback();
     int messageSent1 =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback1, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
 
     AssertJUnit.assertTrue(callback1.getMessageReplied().size() == NODE_NR);
     AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
@@ -396,8 +390,9 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     for (int i = 0; i < NODE_NR; i++) {
       TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
       String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
+
     }
     MessageId msgId = MessageId.from(new UUID(123, 456).toString());
     Message msg = new Message(MessageType.CONTROLLER_MSG, msgId);
@@ -416,8 +411,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
 
     AsyncCallback callback1 = new MockAsyncCallback();
     int messagesSent =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback1, 10000);
+        _participants[0].getMessagingService().sendAndWait(cr, msg, callback1, 10000);
 
     AssertJUnit.assertTrue(callback1.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
@@ -428,9 +422,8 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     msg.setMessageId(msgId);
     cr.setPartition("TestDB_17");
     AsyncCallback callback2 = new MockAsyncCallback();
-    messagesSent =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback2, 10000);
+    messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback2, 10000);
+
     AssertJUnit.assertTrue(callback2.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
         .indexOf(hostSrc) != -1);
@@ -441,9 +434,7 @@ public class TestMessagingService extends ZkStandAloneCMTestBaseWithPropertyServ
     msg.setMessageId(msgId);
     cr.setPartitionState("SLAVE");
     AsyncCallback callback3 = new MockAsyncCallback();
-    messagesSent =
-        _startCMResultMap.get(hostSrc)._manager.getMessagingService().sendAndWait(cr, msg,
-            callback3, 10000);
+    messagesSent = _participants[0].getMessagingService().sendAndWait(cr, msg, callback3, 10000);
     AssertJUnit.assertTrue(callback3.getMessageReplied().get(0).getRecord()
         .getMapField(Message.Attributes.MESSAGE_RESULT.toString()).get("ControllerResult")
         .indexOf(hostSrc) != -1);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
index c5558ca..aa48c90 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNonOfflineInitState.java
@@ -22,10 +22,9 @@ package org.apache.helix.integration;
 import java.util.Date;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.MockBootstrapModelFactory;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -51,18 +50,19 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
         1, // replicas
         "Bootstrap", true); // do rebalance
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
 
       // add a state model with non-OFFLINE initial state
-      StateMachineEngine stateMach = participants[i].getManager().getStateMachineEngine();
+      StateMachineEngine stateMach = participants[i].getStateMachineEngine();
       MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
       stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
 
@@ -74,16 +74,21 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
             clusterName));
     Assert.assertTrue(result);
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
+
     System.out.println("END testNonOfflineInitState at " + new Date(System.currentTimeMillis()));
   }
 
   private static void setupCluster(String clusterName, String ZkAddr, int startPort,
       String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb,
       int nodesNb, int replica, String stateModelDef, boolean doRebalance) throws Exception {
-    ZkClient zkClient = new ZkClient(ZkAddr);
-    if (zkClient.exists("/" + clusterName)) {
+    if (_gZkClient.exists("/" + clusterName)) {
       LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
-      zkClient.deleteRecursive("/" + clusterName);
+      _gZkClient.deleteRecursive("/" + clusterName);
     }
 
     ClusterSetup setupTool = new ClusterSetup(ZkAddr);
@@ -103,7 +108,6 @@ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
         setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
       }
     }
-    zkClient.close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
index 496d1a6..0f5cc72 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNullReplica.java
@@ -25,8 +25,8 @@ import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -44,7 +44,7 @@ public class TestNullReplica extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -61,14 +61,15 @@ public class TestNullReplica extends ZkIntegrationTestBase {
     idealState.getSimpleFields().remove(IdealState.IdealStateProperty.REPLICAS.toString());
     _gZkClient.writeData(idealStatePath, idealState);
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -78,13 +79,11 @@ public class TestNullReplica extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }
 
-    Thread.sleep(2000);
-    controller.syncStop();
-
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
index 07a2fc0..750e2b7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantErrorMessage.java
@@ -35,10 +35,13 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
+  private static Logger LOG = Logger.getLogger(TestParticipantErrorMessage.class);
+
   @Test()
   public void TestParticipantErrorMessageSend() {
     String participant1 = "localhost_" + START_PORT;
@@ -54,7 +57,7 @@ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
     Criteria recipientCriteria = new Criteria();
     recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
     recipientCriteria.setSessionSpecific(false);
-    _startCMResultMap.get(participant1)._manager.getMessagingService().send(recipientCriteria,
+    _participants[0].getMessagingService().send(recipientCriteria,
         errorMessage1);
 
     Message errorMessage2 =
@@ -69,23 +72,22 @@ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
     Criteria recipientCriteria2 = new Criteria();
     recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
     recipientCriteria2.setSessionSpecific(false);
-    _startCMResultMap.get(participant2)._manager.getMessagingService().send(recipientCriteria2,
+    _participants[1].getMessagingService().send(recipientCriteria2,
         errorMessage2);
 
     try {
       Thread.sleep(1500);
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Interrupted sleep", e);
     }
 
     boolean result =
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
             CLUSTER_NAME));
     Assert.assertTrue(result);
-    Builder kb = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().keyBuilder();
+    Builder kb = _participants[1].getHelixDataAccessor().keyBuilder();
     ExternalView externalView =
-        _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().getProperty(
+        _participants[1].getHelixDataAccessor().getProperty(
             kb.externalView("TestDB"));
 
     for (String partitionName : externalView.getRecord().getMapFields().keySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
index 4227688..aa67ac9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestParticipantNameCollision.java
@@ -22,7 +22,7 @@ package org.apache.helix.integration;
 import java.util.Date;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.log4j.Logger;
 import org.testng.annotations.Test;
 
@@ -33,20 +33,21 @@ public class TestParticipantNameCollision extends ZkStandAloneCMTestBase {
   public void testParticiptantNameCollision() throws Exception {
     logger.info("RUN TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
 
-    StartCMResult result = null;
+    MockParticipantManager newParticipant = null;
     for (int i = 0; i < 1; i++) {
       String instanceName = "localhost_" + (START_PORT + i);
       try {
         // the call fails on getClusterManagerForParticipant()
         // no threads start
-        result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
+        newParticipant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+        newParticipant.syncStart();
       } catch (Exception e) {
         e.printStackTrace();
       }
     }
 
     Thread.sleep(30000);
-    TestHelper.verifyWithTimeout("verifyNotConnected", 30 * 1000, result._manager);
+    TestHelper.verifyWithTimeout("verifyNotConnected", 30 * 1000, newParticipant);
 
     logger.info("STOP TestParticipantNameCollision() at " + new Date(System.currentTimeMillis()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
index d900d98..a1c413b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPauseSignal.java
@@ -24,12 +24,12 @@ import java.util.Date;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -47,7 +47,7 @@ public class TestPauseSignal extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -59,14 +59,15 @@ public class TestPauseSignal extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -112,13 +113,11 @@ public class TestPauseSignal extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }
 
-    Thread.sleep(2000);
-    controller.syncStop();
-
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java b/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java
index be2de65..f2af156 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRedefineStateModelDef.java
@@ -9,10 +9,10 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.StateModelDefinition;
@@ -49,15 +49,16 @@ public class TestRedefineStateModelDef extends ZkUnitTestBase {
     autoRebalance(clusterName);
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -70,7 +71,7 @@ public class TestRedefineStateModelDef extends ZkUnitTestBase {
     // stop controller, redefine state model definition, and re-start controller
     controller.syncStop();
     redefineStateModelDef(clusterName);
-    controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     result =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
index e04cc79..d3a370d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRenamePartition.java
@@ -23,16 +23,17 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -40,6 +41,14 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestRenamePartition extends ZkIntegrationTestBase {
+  // map from clusterName to participants
+  final Map<String, MockParticipantManager[]> _participantMap =
+      new ConcurrentHashMap<String, MockParticipantManager[]>();
+
+  // map from clusterName to controllers
+  final Map<String, ClusterControllerManager> _controllerMap =
+      new ConcurrentHashMap<String, ClusterControllerManager>();
+
   @Test()
   public void testRenamePartitionAutoIS() throws Exception {
     String clusterName = "CLUSTER_" + getShortClassName() + "_auto";
@@ -72,8 +81,8 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
             ZK_ADDR, clusterName));
     Assert.assertTrue(result);
 
+    stop(clusterName);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
   }
 
   @Test()
@@ -104,7 +113,7 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
     idealState.setStateModelDefId(StateModelDefId.from("MasterSlave"));
 
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
@@ -119,23 +128,25 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
             ZK_ADDR, clusterName));
     Assert.assertTrue(result);
+
+    stop(clusterName);
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
   }
 
   private void startAndVerify(String clusterName) throws Exception {
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
 
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     boolean result =
@@ -143,5 +154,21 @@ public class TestRenamePartition extends ZkIntegrationTestBase {
             ZK_ADDR, clusterName));
     Assert.assertTrue(result);
 
+    _participantMap.put(clusterName, participants);
+    _controllerMap.put(clusterName, controller);
+  }
+
+  private void stop(String clusterName) {
+    ClusterControllerManager controller = _controllerMap.get(clusterName);
+    if (controller != null) {
+      controller.syncStop();
+    }
+
+    MockParticipantManager[] participants = _participantMap.get(clusterName);
+    if (participants != null) {
+      for (MockParticipantManager participant : participants) {
+        participant.syncStop();
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
index 72920a8..7159a17 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetInstance.java
@@ -25,9 +25,9 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.ErrTransition;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
@@ -54,7 +54,8 @@ public class TestResetInstance extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -65,16 +66,15 @@ public class TestResetInstance extends ZkIntegrationTestBase {
     };
 
     // start mock participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
-        participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR,
-                new ErrTransition(errPartitions));
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -102,8 +102,6 @@ public class TestResetInstance extends ZkIntegrationTestBase {
     Assert.assertTrue(result, "Cluster verification fails");
 
     // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
index 9479cff..af1ef13 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetPartitionState.java
@@ -31,9 +31,9 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.api.State;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.ErrTransition;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
@@ -81,7 +81,8 @@ public class TestResetPartitionState extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -92,16 +93,16 @@ public class TestResetPartitionState extends ZkIntegrationTestBase {
     };
 
     // start mock participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
         participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR,
-                new ErrTransition(errPartitions));
+            new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -170,8 +171,6 @@ public class TestResetPartitionState extends ZkIntegrationTestBase {
     Assert.assertEquals(_errToOfflineInvoked, 2, "Should reset 2 partitions");
 
     // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
index f8b4dc9..46a05d8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResetResource.java
@@ -25,9 +25,9 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.ErrTransition;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
@@ -53,7 +53,8 @@ public class TestResetResource extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -64,16 +65,15 @@ public class TestResetResource extends ZkIntegrationTestBase {
     };
 
     // start mock participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
-        participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR,
-                new ErrTransition(errPartitions));
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -101,8 +101,6 @@ public class TestResetResource extends ZkIntegrationTestBase {
     Assert.assertTrue(result, "Cluster verification fails");
 
     // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
index 008782c..64043ed 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRestartParticipant.java
@@ -24,8 +24,8 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -35,15 +35,15 @@ import org.testng.annotations.Test;
 
 public class TestRestartParticipant extends ZkIntegrationTestBase {
   public class KillOtherTransition extends MockTransition {
-    final AtomicReference<MockParticipant> _other;
+    final AtomicReference<MockParticipantManager> _other;
 
-    public KillOtherTransition(MockParticipant other) {
-      _other = new AtomicReference<MockParticipant>(other);
+    public KillOtherTransition(MockParticipantManager other) {
+      _other = new AtomicReference<MockParticipantManager>(other);
     }
 
     @Override
     public void doTransition(Message message, NotificationContext context) {
-      MockParticipant other = _other.getAndSet(null);
+      MockParticipantManager other = _other.getAndSet(null);
       if (other != null) {
         System.err.println("Kill " + other.getInstanceName()
             + ". Interrupted exceptions are IGNORABLE");
@@ -58,7 +58,7 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
     System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis()));
 
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -69,19 +69,19 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
         3, // replicas
         "MasterSlave", true); // do rebalance
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 4) {
-        participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR, new KillOtherTransition(
-                participants[0]));
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new KillOtherTransition(participants[0]));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
-        // Thread.sleep(100);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
 
       participants[i].syncStart();
@@ -94,9 +94,9 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
 
     // restart
     Thread.sleep(500);
-    MockParticipant participant =
-        new MockParticipant(participants[0].getClusterName(), participants[0].getInstanceName(),
-            ZK_ADDR, null);
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, participants[0].getClusterName(),
+            participants[0].getInstanceName());
     System.err.println("Restart " + participant.getInstanceName());
     participant.syncStart();
     result =
@@ -104,6 +104,13 @@ public class TestRestartParticipant extends ZkIntegrationTestBase {
             clusterName));
     Assert.assertTrue(result);
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
+    participant.syncStop();
+
     System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis()));
 
   }