You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/11/20 22:12:20 UTC

[05/52] [abbrv] [HELIX-279] Apply gc handling fixes to ZKHelixManager

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
index 9af6ca8..af079c1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
@@ -30,12 +30,11 @@ import org.apache.helix.HelixProperty.HelixPropertyAttribute;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.ErrTransition;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.tools.ClusterSetup;
@@ -89,15 +88,16 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     TestZkChildListener listener = new TestZkChildListener();
     _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -110,7 +110,6 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
 
     // clean up
     // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
@@ -139,15 +138,16 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -157,7 +157,6 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // stop all participants
-    Thread.sleep(1000);
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }
@@ -178,7 +177,7 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -191,7 +190,6 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
 
     // clean up
     // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
@@ -207,10 +205,9 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     String clusterName = className + "_" + methodName;
 
     final int n = 5;
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-    // ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, // resource#
         6, // partition#
@@ -229,19 +226,20 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     final String hostToFail = "localhost_12921";
     final String partitionToFail = "TestDB0_4";
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (instanceName.equals(hostToFail)) {
         Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
-        errPartitions.put("SLAVE-MASTER", TestHelper.setOf(partitionToFail));
-        participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR,
-                new ErrTransition(errPartitions));
+        errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -298,7 +296,8 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     TestZkChildListener listener = new TestZkChildListener();
     _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // pause controller
@@ -308,11 +307,11 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     });
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -336,7 +335,6 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
 
     // clean up
     // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
index 2ae8bf3..9bfdfb5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
@@ -26,12 +26,12 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.messaging.handling.BatchMessageWrapper;
-import org.apache.helix.mock.controller.ClusterController;
 import org.apache.helix.mock.participant.MockMSModelFactory;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -90,17 +90,19 @@ public class TestBatchMessageWrapper extends ZkUnitTestBase {
     idealState.setBatchMessageMode(true);
     accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     TestMockMSModelFactory[] ftys = new TestMockMSModelFactory[n];
 
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
       ftys[i] = new TestMockMSModelFactory();
-      participants[i] = new MockParticipant(ftys[i], clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", ftys[i]);
       participants[i].syncStart();
 
       // wait for each participant to complete state transitions, so we have deterministic results
@@ -133,6 +135,12 @@ public class TestBatchMessageWrapper extends ZkUnitTestBase {
     Assert.assertEquals(wrapper._startCount, 2,
         "Expect 2 batch.end: O->S and S->M for 2nd participant");
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
index 6edca76..51b0d3c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
@@ -24,10 +24,10 @@ import java.util.Date;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -45,7 +45,7 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
     // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
@@ -66,13 +66,15 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
     idealState.setBucketSize(1);
     accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -87,6 +89,7 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
index 9d63a65..97b6ebb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java
@@ -25,8 +25,8 @@ import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
@@ -39,7 +39,7 @@ public class TestCarryOverBadCurState extends ZkIntegrationTestBase {
     System.out.println("START testCarryOverBadCurState at " + new Date(System.currentTimeMillis()));
 
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -58,13 +58,15 @@ public class TestCarryOverBadCurState extends ZkIntegrationTestBase {
     _gZkClient.createPersistent(path, true);
     _gZkClient.writeData(path, badCurState);
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -78,6 +80,11 @@ public class TestCarryOverBadCurState extends ZkIntegrationTestBase {
             clusterName));
     Assert.assertTrue(result);
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
     System.out.println("END testCarryOverBadCurState at " + new Date(System.currentTimeMillis()));
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
index 49c5576..93c58eb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java
@@ -26,11 +26,11 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -61,15 +61,16 @@ public class TestCleanupExternalView extends ZkUnitTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -84,7 +85,7 @@ public class TestCleanupExternalView extends ZkUnitTestBase {
     admin.enableCluster(clusterName, false);
     // wait all pending zk-events being processed, otherwise remove current-state will cause
     // controller send O->S message
-    ZkTestHelper.tryWaitZkEventsCleaned(controller.getManager().getZkClient());
+    ZkTestHelper.tryWaitZkEventsCleaned(controller.getZkClient());
     // System.out.println("paused controller");
 
     // drop resource
@@ -121,8 +122,13 @@ public class TestCleanupExternalView extends ZkUnitTestBase {
     Assert.assertNull(externalView, "external-view for TestDB0 should be removed, but was: "
         + externalView);
 
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
 
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
index c0ced72..e844a27 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java
@@ -26,7 +26,6 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyType;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.util.HelixUtil;
 import org.testng.Assert;
@@ -40,8 +39,8 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
     String namespace = "/" + CLUSTER_NAME;
-    if (_zkClient.exists(namespace)) {
-      _zkClient.deleteRecursive(namespace);
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
     }
     _setupTool = new ClusterSetup(ZK_ADDR);
 
@@ -58,13 +57,12 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
   @Override
   @BeforeClass()
   public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
+
   }
 
   @Override
   @AfterClass()
   public void afterClass() {
-    _zkClient.close();
   }
 
   @Test()
@@ -72,9 +70,8 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
     setupCluster();
     String controllerMsgPath =
         HelixUtil.getControllerPropertyPath(CLUSTER_NAME, PropertyType.MESSAGES_CONTROLLER);
-    _zkClient.deleteRecursive(controllerMsgPath);
+    _gZkClient.deleteRecursive(controllerMsgPath);
     HelixManager manager = null;
-    ;
 
     try {
       manager =
@@ -106,7 +103,7 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
 
     setupCluster();
     String stateModelPath = HelixUtil.getStateModelDefinitionPath(CLUSTER_NAME);
-    _zkClient.deleteRecursive(stateModelPath);
+    _gZkClient.deleteRecursive(stateModelPath);
 
     try {
       manager =
@@ -125,7 +122,7 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase {
     String instanceStatusUpdatePath =
         HelixUtil.getInstancePropertyPath(CLUSTER_NAME, "localhost_" + (START_PORT + 1),
             PropertyType.STATUSUPDATES);
-    _zkClient.deleteRecursive(instanceStatusUpdatePath);
+    _gZkClient.deleteRecursive(instanceStatusUpdatePath);
 
     try {
       manager =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
index 43cfa5a..5467932 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomIdealState.java
@@ -32,18 +32,6 @@ import org.testng.annotations.Test;
 
 public class TestCustomIdealState extends ZkIntegrationTestBase {
   private static Logger LOG = Logger.getLogger(TestCustomIdealState.class);
-  ZkClient _zkClient;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-  }
-
-  @AfterClass
-  public void afterClass() {
-    _zkClient.close();
-  }
 
   @Test
   public void testBasic() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
index 2c0badc..ce26e2e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java
@@ -105,12 +105,12 @@ public class TestCustomizedIdealStateRebalancer extends
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 3);
 
     boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, db2));
     Assert.assertTrue(result);
     Thread.sleep(1000);
     HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
     ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
     Assert.assertEquals(ev.getPartitionSet().size(), 60);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
index f6ddd34..cc7fd5f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
@@ -27,10 +27,10 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkTestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.ClusterSetup;
@@ -52,7 +52,7 @@ public class TestDisable extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -72,14 +72,15 @@ public class TestDisable extends ZkIntegrationTestBase {
     accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -120,7 +121,6 @@ public class TestDisable extends ZkIntegrationTestBase {
 
     // clean up
     // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
@@ -140,7 +140,7 @@ public class TestDisable extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -152,14 +152,15 @@ public class TestDisable extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -200,7 +201,6 @@ public class TestDisable extends ZkIntegrationTestBase {
 
     // clean up
     // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
@@ -219,7 +219,7 @@ public class TestDisable extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -239,14 +239,15 @@ public class TestDisable extends ZkIntegrationTestBase {
     accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -292,7 +293,6 @@ public class TestDisable extends ZkIntegrationTestBase {
 
     // clean up
     // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
@@ -311,7 +311,7 @@ public class TestDisable extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -323,14 +323,15 @@ public class TestDisable extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -376,7 +377,6 @@ public class TestDisable extends ZkIntegrationTestBase {
 
     // clean up
     // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
index 93c765c..be523d0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableNode.java
@@ -38,7 +38,7 @@ public class TestDisableNode extends ZkStandAloneCMTestBaseWithPropertyServerChe
             ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
 
-    ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     tool.enableInstance(CLUSTER_NAME, PARTICIPANT_PREFIX + "_12918", true);
 
     result =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
index 3067a0b..ba7e8e4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisablePartition.java
@@ -56,7 +56,7 @@ public class TestDisablePartition extends ZkStandAloneCMTestBaseWithPropertyServ
 
     TestHelper.verifyState(CLUSTER_NAME, ZK_ADDR, map, "OFFLINE");
 
-    ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     tool.enablePartition(true, CLUSTER_NAME, "localhost_12919", "TestDB", Arrays.asList("TestDB_9"));
 
     result =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
index 3155dd4..8135191 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedCMMain.java
@@ -25,10 +25,10 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -77,11 +77,10 @@ public class TestDistributedCMMain extends ZkIntegrationTestBase {
         "LeaderStandby", true); // do rebalance
 
     // start distributed cluster controllers
-    ClusterController[] controllers = new ClusterController[n + n];
+    ClusterDistributedController[] controllers = new ClusterDistributedController[n + n];
     for (int i = 0; i < n; i++) {
       controllers[i] =
-          new ClusterController(controllerClusterName, "controller_" + i, ZK_ADDR,
-              HelixControllerMain.DISTRIBUTED.toString());
+          new ClusterDistributedController(ZK_ADDR, controllerClusterName, "controller_" + i);
       controllers[i].syncStart();
     }
 
@@ -92,11 +91,11 @@ public class TestDistributedCMMain extends ZkIntegrationTestBase {
     Assert.assertTrue(result, "Controller cluster NOT in ideal state");
 
     // start first cluster
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     final String firstClusterName = clusterNamePrefix + "0_0";
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost0_" + (12918 + i);
-      participants[i] = new MockParticipant(firstClusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, firstClusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -114,8 +113,7 @@ public class TestDistributedCMMain extends ZkIntegrationTestBase {
     setupTool.rebalanceStorageCluster(controllerClusterName, clusterNamePrefix + "0", 6);
     for (int i = n; i < 2 * n; i++) {
       controllers[i] =
-          new ClusterController(controllerClusterName, "controller_" + i, ZK_ADDR,
-              HelixControllerMain.DISTRIBUTED.toString());
+          new ClusterDistributedController(ZK_ADDR, controllerClusterName, "controller_" + i);
       controllers[i].syncStart();
     }
 
@@ -157,7 +155,6 @@ public class TestDistributedCMMain extends ZkIntegrationTestBase {
     // clean up
     // wait for all zk callbacks done
     System.out.println("Cleaning up...");
-    Thread.sleep(2000);
     for (int i = 0; i < 5; i++) {
       result =
           ClusterStateVerifier
@@ -166,7 +163,6 @@ public class TestDistributedCMMain extends ZkIntegrationTestBase {
       controllers[i].syncStop();
     }
 
-    // Thread.sleep(2000);
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
index 672df10..f3def23 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDistributedClusterController.java
@@ -25,10 +25,10 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -76,11 +76,10 @@ public class TestDistributedClusterController extends ZkIntegrationTestBase {
         "LeaderStandby", true); // do rebalance
 
     // start distributed cluster controllers
-    ClusterController[] controllers = new ClusterController[n];
+    ClusterDistributedController[] controllers = new ClusterDistributedController[n];
     for (int i = 0; i < n; i++) {
       controllers[i] =
-          new ClusterController(controllerClusterName, "controller_" + i, ZK_ADDR,
-              HelixControllerMain.DISTRIBUTED.toString());
+          new ClusterDistributedController(ZK_ADDR, controllerClusterName, "controller_" + i);
       controllers[i].syncStart();
     }
 
@@ -91,11 +90,11 @@ public class TestDistributedClusterController extends ZkIntegrationTestBase {
     Assert.assertTrue(result, "Controller cluster NOT in ideal state");
 
     // start first cluster
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     final String firstClusterName = clusterNamePrefix + "0_0";
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost0_" + (12918 + i);
-      participants[i] = new MockParticipant(firstClusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, firstClusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -114,11 +113,11 @@ public class TestDistributedClusterController extends ZkIntegrationTestBase {
     controllers[j].syncStop();
 
     // setup the second cluster
-    MockParticipant[] participants2 = new MockParticipant[n];
+    MockParticipantManager[] participants2 = new MockParticipantManager[n];
     final String secondClusterName = clusterNamePrefix + "0_1";
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost1_" + (12918 + i);
-      participants2[i] = new MockParticipant(secondClusterName, instanceName, ZK_ADDR, null);
+      participants2[i] = new MockParticipantManager(ZK_ADDR, secondClusterName, instanceName);
       participants2[i].syncStart();
     }
 
@@ -130,7 +129,6 @@ public class TestDistributedClusterController extends ZkIntegrationTestBase {
     // clean up
     // wait for all zk callbacks done
     System.out.println("Cleaning up...");
-    Thread.sleep(1000);
     for (int i = 0; i < 5; i++) {
       result =
           ClusterStateVerifier

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
index b29e25d..2f2431b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDriver.java
@@ -21,21 +21,18 @@ package org.apache.helix.integration;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.IdealState.IdealStateProperty;
@@ -46,7 +43,6 @@ import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.TestCommand;
 import org.apache.helix.tools.TestCommand.CommandType;
-import org.apache.helix.tools.TestCommand.NodeOpArg;
 import org.apache.helix.tools.TestExecutor;
 import org.apache.helix.tools.TestExecutor.ZnodePropertyType;
 import org.apache.helix.tools.TestTrigger;
@@ -79,10 +75,8 @@ public class TestDriver {
     public final int _numNode;
     public final int _replica;
 
-    // public final Map<String, ZNRecord> _idealStateMap = new
-    // ConcurrentHashMap<String, ZNRecord>();
-    public final Map<String, StartCMResult> _startCMResultMap =
-        new ConcurrentHashMap<String, StartCMResult>();
+    public final Map<String, HelixManager> _managers =
+        new ConcurrentHashMap<String, HelixManager>();
 
     public TestInfo(String clusterName, ZkClient zkClient, int numDb, int numPartitionsPerDb,
         int numNode, int replica) {
@@ -118,10 +112,6 @@ public class TestDriver {
         replica, true);
   }
 
-  // public static void setupCluster(String uniqTestName, ZkClient zkClient, int
-  // numDb,
-  // int numPartitionPerDb, int numNodes, int replica, boolean doRebalance)
-  // throws Exception
   public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
       int numPartitionsPerResource, int numInstances, int replica, boolean doRebalance)
       throws Exception {
@@ -193,11 +183,15 @@ public class TestDriver {
     for (int id : instanceIds) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + id);
 
-      if (testInfo._startCMResultMap.containsKey(instanceName)) {
+      // if (testInfo._startCMResultMap.containsKey(instanceName)) {
+      if (testInfo._managers.containsKey(instanceName)) {
         LOG.warn("Dummy participant:" + instanceName + " has already started; skip starting it");
       } else {
-        StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, clusterName, instanceName);
-        testInfo._startCMResultMap.put(instanceName, result);
+        // StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, clusterName, instanceName);
+        MockParticipantManager participant =
+            new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participant.syncStart();
+        testInfo._managers.put(instanceName, participant);
         // testInfo._instanceStarted.countDown();
       }
     }
@@ -220,13 +214,13 @@ public class TestDriver {
 
     for (int id : nodeIds) {
       String controllerName = CONTROLLER_PREFIX + "_" + id;
-      if (testInfo._startCMResultMap.containsKey(controllerName)) {
+      if (testInfo._managers.containsKey(controllerName)) {
         LOG.warn("Controller:" + controllerName + " has already started; skip starting it");
       } else {
-        StartCMResult result =
-            TestHelper.startController(clusterName, controllerName, ZK_ADDR,
-                HelixControllerMain.STANDALONE);
-        testInfo._startCMResultMap.put(controllerName, result);
+        ClusterControllerManager controller =
+            new ClusterControllerManager(ZK_ADDR, clusterName, controllerName);
+        controller.syncStart();
+        testInfo._managers.put(controllerName, controller);
       }
     }
   }
@@ -257,27 +251,22 @@ public class TestDriver {
     TestInfo testInfo = _testInfoMap.remove(uniqClusterName);
 
     // stop controller first
-    for (Iterator<Entry<String, StartCMResult>> it =
-        testInfo._startCMResultMap.entrySet().iterator(); it.hasNext();) {
-      Map.Entry<String, StartCMResult> entry = it.next();
-      String instanceName = entry.getKey();
+    for (String instanceName : testInfo._managers.keySet()) {
       if (instanceName.startsWith(CONTROLLER_PREFIX)) {
-        it.remove();
-        HelixManager manager = entry.getValue()._manager;
-        manager.disconnect();
-        Thread thread = entry.getValue()._thread;
-        thread.interrupt();
+        ClusterControllerManager controller =
+            (ClusterControllerManager) testInfo._managers.get(instanceName);
+        controller.syncStop();
       }
     }
 
     Thread.sleep(1000);
 
-    // stop the rest
-    for (Map.Entry<String, StartCMResult> entry : testInfo._startCMResultMap.entrySet()) {
-      HelixManager manager = entry.getValue()._manager;
-      manager.disconnect();
-      Thread thread = entry.getValue()._thread;
-      thread.interrupt();
+    for (String instanceName : testInfo._managers.keySet()) {
+      if (!instanceName.startsWith(CONTROLLER_PREFIX)) {
+        MockParticipantManager participant =
+            (MockParticipantManager) testInfo._managers.get(instanceName);
+        participant.syncStop();
+      }
     }
 
     testInfo._zkClient.close();
@@ -292,23 +281,24 @@ public class TestDriver {
     }
 
     TestInfo testInfo = _testInfoMap.get(uniqClusterName);
-    // String clusterName = testInfo._clusterName;
 
     String failHost = PARTICIPANT_PREFIX + "_" + (START_PORT + instanceId);
-    StartCMResult result = testInfo._startCMResultMap.remove(failHost);
+    MockParticipantManager participant =
+        (MockParticipantManager) testInfo._managers.remove(failHost);
 
     // TODO need sync
-    if (result == null || result._manager == null || result._thread == null) {
+    if (participant == null) {
       String errMsg = "Dummy participant:" + failHost + " seems not running";
       LOG.error(errMsg);
     } else {
       // System.err.println("try to stop participant: " +
       // result._manager.getInstanceName());
-      NodeOpArg arg = new NodeOpArg(result._manager, result._thread);
-      TestCommand command = new TestCommand(CommandType.STOP, new TestTrigger(beginTime), arg);
-      List<TestCommand> commandList = new ArrayList<TestCommand>();
-      commandList.add(command);
-      TestExecutor.executeTestAsync(commandList, ZK_ADDR);
+      // NodeOpArg arg = new NodeOpArg(result._manager, result._thread);
+      // TestCommand command = new TestCommand(CommandType.STOP, new TestTrigger(beginTime), arg);
+      // List<TestCommand> commandList = new ArrayList<TestCommand>();
+      // commandList.add(command);
+      // TestExecutor.executeTestAsync(commandList, ZK_ADDR);
+      participant.syncStop();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
index f7a75a6..a81e35b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
@@ -26,15 +26,17 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.ErrTransition;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.tools.ClusterSetup;
@@ -45,7 +47,7 @@ import org.testng.annotations.Test;
 
 public class TestDrop extends ZkIntegrationTestBase {
   @Test
-  public void testDropErrorPartitionAutoIS() throws Exception {
+  public void testDropResourceWithErrorPartitionSemiAuto() throws Exception {
     // Logger.getRootLogger().setLevel(Level.INFO);
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
@@ -54,7 +56,7 @@ public class TestDrop extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -66,7 +68,8 @@ public class TestDrop extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
@@ -78,11 +81,10 @@ public class TestDrop extends ZkIntegrationTestBase {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
-        participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR, new ErrTransition(
-                errTransitions));
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errTransitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -108,19 +110,16 @@ public class TestDrop extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
-    // wait for all zk callbacks done
-    // Thread.sleep(1000);
-    // controller.syncStop();
-    // for (int i = 0; i < 5; i++)
-    // {
-    // participants[i].syncStop();
-    // }
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
   @Test
-  public void testDropErrorPartitionFailedAutoIS() throws Exception {
+  public void testFailToDropResourceWithErrorPartitionSemiAuto() throws Exception {
     // Logger.getRootLogger().setLevel(Level.INFO);
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
@@ -129,7 +128,7 @@ public class TestDrop extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -141,7 +140,8 @@ public class TestDrop extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
@@ -153,11 +153,10 @@ public class TestDrop extends ZkIntegrationTestBase {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
-        participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR, new ErrTransition(
-                errTransitions));
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errTransitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -192,19 +191,16 @@ public class TestDrop extends ZkIntegrationTestBase {
     Assert.assertEquals(disabledPartitions.get(0), "TestDB0_4");
 
     // clean up
-    // wait for all zk callbacks done
-    // Thread.sleep(1000);
-    // controller.syncStop();
-    // for (int i = 0; i < 5; i++)
-    // {
-    // participants[i].syncStop();
-    // }
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
   @Test
-  public void testDropErrorPartitionCustomIS() throws Exception {
+  public void testDropResourceWithErrorPartitionCustom() throws Exception {
     // Logger.getRootLogger().setLevel(Level.INFO);
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
@@ -213,7 +209,7 @@ public class TestDrop extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -240,7 +236,8 @@ public class TestDrop extends ZkIntegrationTestBase {
     accessor.setProperty(keyBuiler.idealStates("TestDB0"), isBuilder.build());
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
@@ -251,11 +248,10 @@ public class TestDrop extends ZkIntegrationTestBase {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
-        participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR, new ErrTransition(
-                errTransitions));
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errTransitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -279,6 +275,12 @@ public class TestDrop extends ZkIntegrationTestBase {
             clusterName));
     Assert.assertTrue(result, "Should be empty exeternal-view");
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
@@ -292,7 +294,7 @@ public class TestDrop extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -304,14 +306,15 @@ public class TestDrop extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -351,13 +354,74 @@ public class TestDrop extends ZkIntegrationTestBase {
         "schemata externalView should be empty but was \"" + extView + "\"");
 
     // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
+
+  /**
+   * Drop a single partition in a resource of semi-auto mode
+   */
+  @Test
+  public void testDropSinglePartitionSemiAuto() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        4, // partitions per resource
+        n, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // remove one partition from ideal-state should drop that partition
+    String partitionToDrop = "TestDB0_1";
+    IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
+    idealState.getRecord().getListFields().remove(partitionToDrop);
+    idealState.getRecord().getMapFields().remove(partitionToDrop);
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
+
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    ExternalView externalView = accessor.getProperty(keyBuilder.externalView("TestDB0"));
+    Assert.assertFalse(externalView.getPartitionSet().contains(partitionToDrop),
+        "TestDB0_0 should be dropped since it's not in ideal-state");
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
index ba88370..0d02d12 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDropResource.java
@@ -20,7 +20,7 @@ package org.apache.helix.integration;
  */
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
@@ -59,9 +59,8 @@ public class TestDropResource extends ZkStandAloneCMTestBaseWithPropertyServerCh
 
     String hostToKill = "localhost_12920";
 
-    _startCMResultMap.get(hostToKill)._manager.disconnect();
+    _participants[2].syncStop();
     Thread.sleep(1000);
-    _startCMResultMap.get(hostToKill)._thread.interrupt();
 
     String command = "-zkSvr " + ZK_ADDR + " -dropResource " + CLUSTER_NAME + " " + "MyDB2";
     ClusterSetup.processCommandLineArgs(command.split(" "));
@@ -70,8 +69,8 @@ public class TestDropResource extends ZkStandAloneCMTestBaseWithPropertyServerCh
         TestHelper.<String> setOf("localhost_12918", "localhost_12919",
         /* "localhost_12920", */"localhost_12921", "localhost_12922"), ZK_ADDR);
 
-    StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, hostToKill);
-    _startCMResultMap.put(hostToKill, result);
+    _participants[2] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, hostToKill);
+    _participants[2].syncStart();
 
     TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", 30 * 1000, CLUSTER_NAME, "MyDB2",
         TestHelper.<String> setOf("localhost_12918", "localhost_12919", "localhost_12920",

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java b/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
index 616f63b..0291844 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestEnablePartitionDuringDisable.java
@@ -26,17 +26,20 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestEnablePartitionDuringDisable extends ZkIntegrationTestBase {
+  private static Logger LOG = Logger.getLogger(TestEnablePartitionDuringDisable.class);
+
   static {
     // Logger.getRootLogger().setLevel(Level.INFO);
   }
@@ -65,8 +68,7 @@ public class TestEnablePartitionDuringDisable extends ZkIntegrationTestBase {
 
             ClusterSetup.processCommandLineArgs(command.split("\\s+"));
           } catch (Exception e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+            LOG.error("Exception in cluster setup", e);
           }
 
         } else if (slaveToOfflineCnt > 0 && fromState.equals(State.from("OFFLINE"))
@@ -96,19 +98,21 @@ public class TestEnablePartitionDuringDisable extends ZkIntegrationTestBase {
         3, // replicas
         "MasterSlave", true); // do rebalance
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     EnablePartitionTransition transition = new EnablePartitionTransition();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (instanceName.equals("localhost_12919")) {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, transition);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(transition);
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -141,8 +145,6 @@ public class TestEnablePartitionDuringDisable extends ZkIntegrationTestBase {
     Assert.assertEquals(transition.offlineToSlave, 1, "should get 1 offlineToSlave transition");
 
     // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
index 7e01c89..71c4339 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestErrorPartition.java
@@ -26,10 +26,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.mock.participant.ErrTransition;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -38,7 +38,7 @@ public class TestErrorPartition extends ZkIntegrationTestBase {
   @Test()
   public void testErrorPartition() throws Exception {
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START testErrorPartition() at " + new Date(System.currentTimeMillis()));
     ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
@@ -46,8 +46,10 @@ public class TestErrorPartition extends ZkIntegrationTestBase {
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, 10, 5, 3,
         "MasterSlave", true);
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
@@ -57,14 +59,12 @@ public class TestErrorPartition extends ZkIntegrationTestBase {
             put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
           }
         };
-        participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR,
-                new ErrTransition(errPartitions));
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     Map<String, Map<String, String>> errStates = new HashMap<String, Map<String, String>>();
@@ -113,7 +113,7 @@ public class TestErrorPartition extends ZkIntegrationTestBase {
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
             ZK_ADDR, clusterName));
     Assert.assertTrue(result);
-    participants[0] = new MockParticipant(clusterName, "localhost_12918", ZK_ADDR);
+    participants[0] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
     new Thread(participants[0]).start();
 
     result =
@@ -121,6 +121,12 @@ public class TestErrorPartition extends ZkIntegrationTestBase {
             ZK_ADDR, clusterName));
     Assert.assertTrue(result);
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
+
     System.out.println("END testErrorPartition() at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java b/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
index 9f4c596..3e31f17 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestExternalViewUpdates.java
@@ -26,9 +26,9 @@ import java.util.List;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
@@ -42,7 +42,7 @@ public class TestExternalViewUpdates extends ZkIntegrationTestBase {
     System.out.println("START testExternalViewUpdates at " + new Date(System.currentTimeMillis()));
 
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
     int resourceNb = 10;
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -53,13 +53,15 @@ public class TestExternalViewUpdates extends ZkIntegrationTestBase {
         1, // replicas
         "MasterSlave", true); // do rebalance
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -90,8 +92,11 @@ public class TestExternalViewUpdates extends ZkIntegrationTestBase {
       Assert.assertTrue(stat.getVersion() <= 2, "ExternalView should be updated at most 2 times");
     }
 
-    // TODO: need stop controller and participants
-
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
     System.out.println("END testExternalViewUpdates at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
index b24f511..c7a1700 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
@@ -28,21 +28,22 @@ import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.MockJobIntf;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.participant.CustomCodeCallbackHandler;
 import org.apache.helix.participant.HelixCustomCodeRunner;
 import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
+  private static Logger LOG = Logger.getLogger(TestHelixCustomCodeRunner.class);
+
   private final String _clusterName = "CLUSTER_" + getShortClassName();
   private final int _nodeNb = 5;
   private final int _startPort = 12918;
@@ -75,8 +76,7 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
         customCodeRunner.invoke(_callback).on(ChangeType.LIVE_INSTANCE)
             .usingLeaderStandbyModel("TestParticLeader").start();
       } catch (Exception e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
+        LOG.error("Exception do pre-connect job", e);
       }
     }
 
@@ -101,15 +101,19 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
         _nodeNb, // replica
         "MasterSlave", true);
 
-    TestHelper.startController(_clusterName, "controller_0", ZK_ADDR,
-        HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+    controller.syncStart();
 
-    MockParticipant[] partics = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
     for (int i = 0; i < _nodeNb; i++) {
       String instanceName = "localhost_" + (_startPort + i);
 
-      partics[i] = new MockParticipant(_clusterName, instanceName, ZK_ADDR, null, new MockJob());
-      partics[i].syncStart();
+      MockJob job = new MockJob();
+      participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
+
+      job.doPreConnectJob(participants[i]);
+      participants[i].syncStart();
     }
     boolean result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
@@ -121,20 +125,26 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
     _callback._isCallbackInvoked = false;
 
     // add a new live instance
-    ZkClient zkClient = new ZkClient(ZK_ADDR);
-    zkClient.setZkSerializer(new ZNRecordSerializer());
+    // ZkClient zkClient = new ZkClient(ZK_ADDR);
+    // zkClient.setZkSerializer(new ZNRecordSerializer());
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
+        new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     LiveInstance newLiveIns = new LiveInstance("newLiveInstance");
-    newLiveIns.setHelixVersion("0.0.0");
+    newLiveIns.setHelixVersion("0.6.0");
     newLiveIns.setSessionId("randomSessionId");
     accessor.setProperty(keyBuilder.liveInstance("newLiveInstance"), newLiveIns);
 
     Thread.sleep(1000); // wait for the CALLBACK type callback to finish
     Assert.assertTrue(_callback._isCallbackInvoked);
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < _nodeNb; i++) {
+      participants[i].syncStop();
+    }
+
     System.out.println("END " + _clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java
index 4484386..1f906d0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixInstanceTag.java
@@ -33,8 +33,7 @@ import org.testng.annotations.Test;
 public class TestHelixInstanceTag extends ZkStandAloneCMTestBase {
   @Test
   public void testInstanceTag() throws Exception {
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixManager manager = _controller;
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
 
     String DB2 = "TestDB2";

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java b/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java
index f69aeec..2761634 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java
@@ -2,8 +2,7 @@ package org.apache.helix.integration;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.IdealState.RebalanceMode;
@@ -35,8 +34,7 @@ public class TestInstanceAutoJoin extends ZkStandAloneCMTestBase {
 
   @Test
   public void testInstanceAutoJoin() throws Exception {
-    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+    HelixManager manager = _participants[0];
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
 
     _setupTool.addResourceToCluster(CLUSTER_NAME, db2, 60, "OnlineOffline", RebalanceMode.FULL_AUTO
@@ -44,10 +42,13 @@ public class TestInstanceAutoJoin extends ZkStandAloneCMTestBase {
 
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 1);
     String instance2 = "localhost_279699";
-    StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2);
+    // StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2);
+    MockParticipantManager newParticipant =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instance2);
+    newParticipant.syncStart();
 
     Thread.sleep(500);
-    Assert.assertFalse(result._thread.isAlive());
+    // Assert.assertFalse(result._thread.isAlive());
     Assert.assertTrue(null == manager.getHelixDataAccessor().getProperty(
         accessor.keyBuilder().liveInstance(instance2)));
 
@@ -55,12 +56,11 @@ public class TestInstanceAutoJoin extends ZkStandAloneCMTestBase {
 
     manager.getConfigAccessor().set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "true");
 
-    result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2);
-
-    StartCMResult result2 = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2);
+    newParticipant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instance2);
+    newParticipant.syncStart();
 
     Thread.sleep(500);
-    Assert.assertTrue(result._thread.isAlive() || result2._thread.isAlive());
+    // Assert.assertTrue(result._thread.isAlive() || result2._thread.isAlive());
     for (int i = 0; i < 20; i++) {
       if (null == manager.getHelixDataAccessor().getProperty(
           accessor.keyBuilder().liveInstance(instance2))) {
@@ -71,9 +71,6 @@ public class TestInstanceAutoJoin extends ZkStandAloneCMTestBase {
     Assert.assertTrue(null != manager.getHelixDataAccessor().getProperty(
         accessor.keyBuilder().liveInstance(instance2)));
 
-    result._manager.disconnect();
-    result2._manager.disconnect();
-    result._thread.interrupt();
-    result2._thread.interrupt();
+    newParticipant.syncStop();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java b/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java
index dbe183d..f2bf7f6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestInvalidAutoIdealState.java
@@ -29,9 +29,9 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
@@ -41,6 +41,7 @@ import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.Assert;
+import org.testng.annotations.Test;
 
 // Helix-50: integration test for generate message based on state priority
 public class TestInvalidAutoIdealState extends ZkUnitTestBase {
@@ -91,15 +92,16 @@ public class TestInvalidAutoIdealState extends ZkUnitTestBase {
     admin.setResourceIdealState(clusterName, "TestDB", idealState);
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     boolean result =
@@ -108,7 +110,7 @@ public class TestInvalidAutoIdealState extends ZkUnitTestBase {
     Assert.assertTrue(result);
 
     // make sure localhost_12919 is master on TestDB_1
-    HelixDataAccessor accessor = controller.getManager().getHelixDataAccessor();
+    HelixDataAccessor accessor = controller.getHelixDataAccessor();
     Builder keyBuilder = accessor.keyBuilder();
     ExternalView extView = accessor.getProperty(keyBuilder.externalView(db));
     Map<String, String> stateMap = extView.getStateMap(db + "_1");

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
index 268d6d0..bc7e972 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java
@@ -43,9 +43,9 @@ import org.testng.annotations.Test;
 public class TestMessagePartitionStateMismatch extends ZkStandAloneCMTestBase {
   @Test
   public void testStateMismatch() throws InterruptedException {
-    String controllerName = CONTROLLER_PREFIX + "_0";
+    // String controllerName = CONTROLLER_PREFIX + "_0";
 
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixManager manager = _controller; // _startCMResultMap.get(controllerName)._manager;
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     Builder kb = accessor.keyBuilder();
     ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));