You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/10/25 03:21:18 UTC

[05/10] [HELIX-279] Apply gc handling fixes to main ZKHelixManager class

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
index 7f004d3..b00e26c 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
@@ -40,10 +40,13 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+  private static Logger LOG = Logger.getLogger(TestAlertActionTriggering.class);
+
   String _statName = "TestStat@DB=db1";
   String _stat = "TestStat";
   String metricName1 = "TestMetric1";
@@ -51,8 +54,7 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
 
   void setHealthData(int[] val1, int[] val2) {
     for (int i = 0; i < NODE_NR; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+      HelixManager manager = _participants[i];
       ZNRecord record = new ZNRecord(_stat);
       Map<String, String> valMap = new HashMap<String, String>();
       valMap.put(metricName1, val1[i] + "");
@@ -68,15 +70,13 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
     try {
       Thread.sleep(1000);
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("sleep interrupted", e);
     }
   }
 
   void setHealthData2(int[] val1) {
     for (int i = 0; i < NODE_NR; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+      HelixManager manager = _participants[i];
       ZNRecord record = new ZNRecord(_stat);
       Map<String, String> valMap = new HashMap<String, String>();
       valMap.put(metricName2, val1[i] + "");
@@ -91,8 +91,7 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
     try {
       Thread.sleep(1000);
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("sleep interrupted", e);
     }
   }
 
@@ -127,11 +126,9 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
     };
     setHealthData(metrics1, metrics2);
 
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixManager manager = _controller;
 
-    HealthStatsAggregator task =
-        new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+    HealthStatsAggregator task = new HealthStatsAggregator(manager);
     task.aggregate();
     Thread.sleep(4000);
     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
index 125f61f..c18b643 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
@@ -31,12 +31,15 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
 import org.apache.helix.model.AlertHistory;
 import org.apache.helix.model.HealthStat;
 import org.apache.helix.model.HelixConfigScope;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -46,6 +49,8 @@ import org.testng.annotations.Test;
  */
 
 public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+  private final static Logger LOG = Logger.getLogger(TestAlertFireHistory.class);
+
   String _statName = "TestStat@DB=db1";
   String _stat = "TestStat";
   String metricName1 = "TestMetric1";
@@ -57,8 +62,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
 
   void setHealthData(int[] val1, int[] val2) {
     for (int i = 0; i < NODE_NR; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+      HelixManager manager = _participants[i];
       ZNRecord record = new ZNRecord(_stat);
       Map<String, String> valMap = new HashMap<String, String>();
       valMap.put(metricName1, val1[i] + "");
@@ -74,13 +78,12 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
     try {
       Thread.sleep(1000);
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Interrupted sleep", e);
     }
   }
 
   @Test
-  public void TestAlertDisable() throws InterruptedException {
+  public void testAlertDisable() throws InterruptedException {
 
     int[] metrics1 = {
         10, 15, 22, 24, 16
@@ -90,29 +93,27 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
     };
     setHealthData(metrics1, metrics2);
 
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixManager manager = _controller;
     manager.startTimerTasks();
 
     _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
     _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
 
-    // ConfigScope scope = new ConfigScopeBuilder().forCluster(CLUSTER_NAME).build();
     HelixConfigScope scope =
         new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(CLUSTER_NAME).build();
     Map<String, String> properties = new HashMap<String, String>();
     properties.put("healthChange.enabled", "false");
     _setupTool.getClusterManagementTool().setConfig(scope, properties);
 
-    HealthStatsAggregator task =
-        new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+    HealthStatsAggregator task = new HealthStatsAggregator(_controller);
+
     task.aggregate();
     Thread.sleep(100);
     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
     Builder keyBuilder = helixDataAccessor.keyBuilder();
 
     AlertHistory history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
-    //
+
     Assert.assertEquals(history, null);
 
     properties.put("healthChange.enabled", "true");
@@ -128,7 +129,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
   }
 
   @Test
-  public void TestAlertHistory() throws InterruptedException {
+  public void testAlertHistory() throws InterruptedException {
     int[] metrics1 = {
         10, 15, 22, 24, 16
     };
@@ -137,9 +138,10 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
     };
     setHealthData(metrics1, metrics2);
 
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
-    manager.stopTimerTasks();
+    HelixManager manager = _controller;
+    for (HelixTimerTask task : _controller.getControllerTimerTasks()) {
+      task.stop();
+    }
 
     _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
     _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
@@ -154,8 +156,8 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
       historySize = property.getRecord().getMapFields().size();
     }
 
-    HealthStatsAggregator task =
-        new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+    HealthStatsAggregator task = new HealthStatsAggregator(_controller);
+
     task.aggregate();
     Thread.sleep(100);
 
@@ -421,3 +423,4 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
   }
 
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
index c5f373c..b8bd634 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
@@ -29,10 +29,10 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.HealthStat;
 import org.apache.helix.model.Message;
@@ -73,7 +73,7 @@ public class TestDummyAlerts extends ZkIntegrationTestBase {
     String clusterName = className + "_" + methodName;
     final int n = 5;
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
@@ -95,15 +95,16 @@ public class TestDummyAlerts extends ZkIntegrationTestBase {
             "EXP(decay(1.0)(*.defaultPerfCounters@defaultPerfCounters.availableCPUs))CMP(GREATER)CON(2)");
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new DummyAlertsTransition());
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new DummyAlertsTransition());
       participants[i].syncStart();
     }
 
@@ -137,7 +138,6 @@ public class TestDummyAlerts extends ZkIntegrationTestBase {
     }
 
     // clean up
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
index 69d1062..69b52e7 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
@@ -29,18 +29,14 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
@@ -51,7 +47,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestExpandAlert extends ZkIntegrationTestBase {
-  ZkClient _zkClient;
   protected ClusterSetup _setupTool = null;
   protected final String _alertStr =
       "EXP(decay(1.0)(localhost_*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(16)";
@@ -61,15 +56,12 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
 
   @BeforeClass()
   public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
 
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
   }
 
   public class ExpandAlertTransition extends MockTransition {
@@ -120,7 +112,7 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
   @Test()
   public void testExpandAlert() throws Exception {
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START TestExpandAlert at " + new Date(System.currentTimeMillis()));
 
@@ -136,18 +128,19 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
 
     _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
 
-    StartCMResult cmResult =
-        TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
-            HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     // start participants
     for (int i = 0; i < 5; i++) // !!!change back to 5
     {
       String instanceName = "localhost_" + (12918 + i);
 
       participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new ExpandAlertTransition());
-      participants[i].start();
-      // new Thread(participants[i]).start();
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new ExpandAlertTransition());
+      participants[i].syncStart();
     }
 
     boolean result =
@@ -158,13 +151,14 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
     Thread.sleep(1000);
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    // new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
     // sleep for a few seconds to give stats stage time to trigger
     Thread.sleep(3000);
 
     // other verifications go here
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     // for (int i = 0; i < 1; i++) //change 1 back to 5
@@ -181,6 +175,12 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
     Assert.assertFalse(fired);
     // }
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+
+    }
     System.out.println("END TestExpandAlert at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
index 1db5ddd..ccc0a79 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
@@ -29,18 +29,14 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
@@ -51,7 +47,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestSimpleAlert extends ZkIntegrationTestBase {
-  ZkClient _zkClient;
   protected ClusterSetup _setupTool = null;
   protected final String _alertStr =
       "EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -60,15 +55,11 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
 
   @BeforeClass()
   public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
   }
 
   public class SimpleAlertTransition extends MockTransition {
@@ -125,7 +116,7 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
   @Test()
   public void testSimpleAlert() throws Exception {
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START TestSimpleAlert at " + new Date(System.currentTimeMillis()));
 
@@ -140,10 +131,11 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
 
     // enableHealthCheck(clusterName);
 
-    StartCMResult cmResult =
-        TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
-            HelixControllerMain.STANDALONE);
-    cmResult._manager.startTimerTasks();
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+    controller.startTimerTasks();
+
     _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
     // start participants
     for (int i = 0; i < 5; i++) // !!!change back to 5
@@ -151,9 +143,9 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
       String instanceName = "localhost_" + (12918 + i);
 
       participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new SimpleAlertTransition(15));
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new SimpleAlertTransition(15));
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     boolean result =
@@ -163,13 +155,14 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
 
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    // new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
     // sleep for a few seconds to give stats stage time to trigger
     Thread.sleep(3000);
 
     // other verifications go here
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
     // for (int i = 0; i < 1; i++) //change 1 back to 5
     // {
@@ -197,6 +190,11 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
             .equals("ON"));
     // }
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
     System.out.println("END TestSimpleAlert at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
index c5b55da..417a53a 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
@@ -28,29 +28,27 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
-  ZkClient _zkClient;
+  private static Logger LOG = Logger.getLogger(TestSimpleWildcardAlert.class);
+
   protected ClusterSetup _setupTool = null;
   protected final String _alertStr =
       "EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -59,15 +57,12 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
 
   @BeforeClass()
   public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
 
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
   }
 
   public class SimpleAlertTransition extends MockTransition {
@@ -124,7 +119,7 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
   @Test()
   public void testSimpleWildcardAlert() throws Exception {
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START testSimpleWildcardAlert at " + new Date(System.currentTimeMillis()));
 
@@ -139,10 +134,10 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
 
     // enableHealthCheck(clusterName);
 
-    StartCMResult cmResult =
-        TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
-            HelixControllerMain.STANDALONE);
-    cmResult._manager.stopTimerTasks();
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+    controller.stopTimerTasks();
 
     String alertwildcard =
         "EXP(decay(1.0)(localhost*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -154,9 +149,9 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
       String instanceName = "localhost_" + (12944 + i);
 
       participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new SimpleAlertTransition(i * 5));
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new SimpleAlertTransition(i * 5));
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     boolean result =
@@ -167,13 +162,13 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
     Thread.sleep(1000);
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
     // sleep for a few seconds to give stats stage time to trigger
     Thread.sleep(1000);
 
     // other verifications go here
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
     ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
     Map<String, Map<String, String>> recMap = record.getMapFields();
@@ -209,7 +204,7 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
     alertwildcard =
         "EXP(decay(1.0)(localhost*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(15)";
     _setupTool.getClusterManagementTool().addAlert(clusterName, alertwildcard);
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
     Thread.sleep(1000);
 
     record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
@@ -241,6 +236,11 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
       Assert.assertTrue(delta.get(alertString).equals("ON"));
     }
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
     System.out.println("END testSimpleWildcardAlert at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
index 2304b41..cdb7d1d 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
@@ -29,18 +29,14 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
@@ -51,7 +47,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestStalenessAlert extends ZkIntegrationTestBase {
-  ZkClient _zkClient;
   protected ClusterSetup _setupTool = null;
   protected final String _alertStr = "EXP(decay(1)(localhost_*.reportingage))CMP(GREATER)CON(600)";
   protected final String _alertStatusStr = _alertStr + " : (localhost_12918.reportingage)";
@@ -59,15 +54,12 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
 
   @BeforeClass()
   public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
 
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
   }
 
   public class StalenessAlertTransition extends MockTransition {
@@ -118,7 +110,7 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
   @Test()
   public void testStalenessAlert() throws Exception {
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START TestStalenessAlert at " + new Date(System.currentTimeMillis()));
 
@@ -134,18 +126,19 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
 
     _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
 
-    StartCMResult cmResult =
-        TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
-            HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     // start participants
     for (int i = 0; i < 5; i++) // !!!change back to 5
     {
       String instanceName = "localhost_" + (12918 + i);
 
       participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new StalenessAlertTransition());
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new StalenessAlertTransition());
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     boolean result =
@@ -155,13 +148,13 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
 
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
     // sleep for a few seconds to give stats stage time to trigger
     Thread.sleep(3000);
 
     // other verifications go here
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
     // for (int i = 0; i < 1; i++) //change 1 back to 5
     // {
@@ -177,6 +170,11 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
     // Assert.assertFalse(fired);
     // }
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
     System.out.println("END TestStalenessAlert at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
index a0456a7..cc819de 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
@@ -44,18 +44,14 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
@@ -126,7 +122,6 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
   }
 
   private static final Logger _logger = Logger.getLogger(TestWildcardAlert.class);
-  ZkClient _zkClient;
   protected ClusterSetup _setupTool = null;
   protected final String _alertStr =
       "EXP(decay(1)(localhost_*.RestQueryStats@DBName=TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)";
@@ -135,15 +130,12 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
 
   @BeforeClass()
   public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
 
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
   }
 
   public class WildcardAlertTransition extends MockTransition {
@@ -209,7 +201,7 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
   @Test()
   public void testWildcardAlert() throws Exception {
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START TestWildcardAlert at " + new Date(System.currentTimeMillis()));
 
@@ -228,18 +220,18 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
     _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
     // _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr2);
 
-    StartCMResult cmResult =
-        TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
-            HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
     // start participants
     for (int i = 0; i < 5; i++) // !!!change back to 5
     {
       String instanceName = "localhost_" + (12918 + i);
 
       participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new WildcardAlertTransition());
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new WildcardAlertTransition());
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     TestClusterMBeanObserver jmxMBeanObserver =
@@ -252,13 +244,13 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
     Thread.sleep(3000);
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
 
     // sleep for a few seconds to give stats stage time to trigger and for bean to trigger
     Thread.sleep(3000);
 
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     // for (int i = 0; i < 1; i++) //change 1 back to 5
@@ -293,6 +285,12 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
             "EXP(decay(1)(localhost_%.RestQueryStats@DBName#TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)--(%)");
     // }
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
+
     System.out.println("END TestWildcardAlert at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
index 8547666..32fdcff 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
@@ -20,16 +20,9 @@ package org.apache.helix.integration;
  */
 
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.helix.PropertyType;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.model.PauseSignal;
+
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
@@ -46,13 +39,15 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
   protected static final int START_PORT = 12918;
   protected static final String STATE_MODEL = "MasterSlave";
   protected ClusterSetup _setupTool = null;
-  protected Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
 
   protected final String CLASS_NAME = getShortClassName();
   protected final String CONTROLLER_CLUSTER = CONTROLLER_CLUSTER_PREFIX + "_" + CLASS_NAME;
 
   protected static final String TEST_DB = "TestDB";
 
+  MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
+  ClusterDistributedController[] _distControllers = new ClusterDistributedController[NODE_NR];
+
   @BeforeClass
   public void beforeClass() throws Exception {
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
@@ -90,29 +85,18 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
         "MasterSlave", 3, true);
 
     // start dummy participants for the first cluster
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < NODE_NR; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      if (_startCMResultMap.get(instanceName) != null) {
-        LOG.error("fail to start participant:" + instanceName
-            + "(participant with the same name already running");
-      } else {
-        StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, firstCluster, instanceName);
-        _startCMResultMap.put(instanceName, result);
-      }
+      _participants[i] = new MockParticipantManager(ZK_ADDR, firstCluster, instanceName);
+      _participants[i].syncStart();
     }
 
     // start distributed cluster controllers
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < NODE_NR; i++) {
       String controllerName = CONTROLLER_PREFIX + "_" + i;
-      if (_startCMResultMap.get(controllerName) != null) {
-        LOG.error("fail to start controller:" + controllerName
-            + "(controller with the same name already running");
-      } else {
-        StartCMResult result =
-            TestHelper.startController(CONTROLLER_CLUSTER, controllerName, ZK_ADDR,
-                HelixControllerMain.DISTRIBUTED);
-        _startCMResultMap.put(controllerName, result);
-      }
+      _distControllers[i] =
+          new ClusterDistributedController(ZK_ADDR, CONTROLLER_CLUSTER, controllerName);
+      _distControllers[i].syncStart();
     }
 
     verifyClusters();
@@ -134,36 +118,22 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
      * 3) disconnect leader/disconnect participant
      */
     String leader = getCurrentLeader(_gZkClient, CONTROLLER_CLUSTER);
-    // pauseController(_startCMResultMap.get(leader)._manager.getDataAccessor());
-
-    StartCMResult result;
-
-    Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
-
-    while (it.hasNext()) {
-      String instanceName = it.next().getKey();
-      if (!instanceName.equals(leader) && instanceName.startsWith(CONTROLLER_PREFIX)) {
-        result = _startCMResultMap.get(instanceName);
-        result._manager.disconnect();
-        result._thread.interrupt();
-        it.remove();
+    int leaderIdx = -1;
+    for (int i = 0; i < NODE_NR; i++) {
+      if (!_distControllers[i].getInstanceName().equals(leader)) {
+        _distControllers[i].syncStop();
+        verifyClusters();
+      } else {
+        leaderIdx = i;
       }
-      verifyClusters();
     }
+    Assert.assertNotSame(leaderIdx, -1);
 
-    result = _startCMResultMap.remove(leader);
-    result._manager.disconnect();
-    result._thread.interrupt();
-
-    it = _startCMResultMap.entrySet().iterator();
-    while (it.hasNext()) {
-      String instanceName = it.next().getKey();
-      result = _startCMResultMap.get(instanceName);
-      result._manager.disconnect();
-      result._thread.interrupt();
-      it.remove();
-    }
+    _distControllers[leaderIdx].syncStop();
 
+    for (int i = 0; i < NODE_NR; i++) {
+      _participants[i].syncStop();
+    }
     System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
index b135d92..79d8b89 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
@@ -21,18 +21,15 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 import java.util.List;
-import java.util.Set;
 
-import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
@@ -52,16 +49,17 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, 20, nodeNr - 1,
         3, "MasterSlave", true);
 
-    MockParticipant[] participants = new MockParticipant[nodeNr];
+    MockParticipantManager[] participants = new MockParticipantManager[nodeNr];
     for (int i = 0; i < nodeNr - 1; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
-      new Thread(participants[i]).start();
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
     }
 
-    ZkHelixTestManager controller =
-        new ZkHelixTestManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
-    controller.connect();
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     boolean result;
     result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
@@ -75,7 +73,7 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     _gSetupTool.addInstanceToCluster(clusterName, "localhost_12922");
     _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 3);
 
-    participants[nodeNr - 1] = new MockParticipant(clusterName, "localhost_12922", ZK_ADDR);
+    participants[nodeNr - 1] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12922");
     new Thread(participants[nodeNr - 1]).start();
     result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
@@ -86,11 +84,10 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
-    // controller.disconnect();
-    // for (int i = 0; i < nodeNr; i++)
-    // {
-    // participants[i].syncStop();
-    // }
+    controller.syncStop();
+    for (int i = 0; i < nodeNr; i++) {
+      participants[i].syncStop();
+    }
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
@@ -101,11 +98,13 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     // setup grand cluster
-    TestHelper.setupCluster("GRAND_" + clusterName, ZK_ADDR, 0, "controller", null, 0, 0, 1, 0,
+    final String grandClusterName = "GRAND_" + clusterName;
+    TestHelper.setupCluster(grandClusterName, ZK_ADDR, 0, "controller", null, 0, 0, 1, 0,
         null, true);
 
-    TestHelper.startController("GRAND_" + clusterName, "controller_0", ZK_ADDR,
-        HelixControllerMain.DISTRIBUTED);
+    ClusterDistributedController distController =
+        new ClusterDistributedController(ZK_ADDR, grandClusterName, "controller_0");
+    distController.syncStart();
 
     // setup cluster
     _gSetupTool.addCluster(clusterName, true);
@@ -127,12 +126,11 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     _gSetupTool.addResourceToCluster(clusterName, "TestDB0", 1, "LeaderStandby");
     _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 1);
 
-    MockParticipant[] participants = new MockParticipant[nodeNr];
+    MockParticipantManager[] participants = new MockParticipantManager[nodeNr];
     for (int i = 0; i < nodeNr - 1; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     result =
@@ -150,10 +148,8 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     _gSetupTool.addInstanceToCluster(clusterName, "localhost_12919");
     _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 2);
 
-    participants[nodeNr - 1] = new MockParticipant(clusterName, "localhost_12919", ZK_ADDR);
+    participants[nodeNr - 1] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12919");
     participants[nodeNr - 1].syncStart();
-    // new Thread(participants[nodeNr - 1]).start();
-
     result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
             ZK_ADDR, clusterName));
@@ -165,10 +161,10 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     Assert.assertEquals(numberOfListeners, 2); // 1 of participant, and 1 of controller
 
     // clean up
-    // for (int i = 0; i < nodeNr; i++)
-    // {
-    // participants[i].syncStop();
-    // }
+    distController.syncStop();
+    for (int i = 0; i < nodeNr; i++) {
+      participants[i].syncStop();
+    }
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
index 33938ad..cd888d7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
@@ -19,20 +19,17 @@ package org.apache.helix.integration;
  * under the License.
  */
 
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockMSModelFactory;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
@@ -52,7 +49,7 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -63,14 +60,15 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
         3, // replicas
         "MasterSlave", true); // do rebalance
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -116,7 +114,7 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
     // register "TestDB1_Factory" state model factory
     // Logger.getRootLogger().setLevel(Level.INFO);
     for (int i = 0; i < n; i++) {
-      participants[i].getManager().getStateMachineEngine()
+      participants[i].getStateMachineEngine()
           .registerStateModelFactory("MasterSlave", new MockMSModelFactory(), "TestDB1_Factory");
     }
 
@@ -127,7 +125,6 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
 
     // clean up
     // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
index 1ffb86f..bc1c1b0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
@@ -27,8 +27,8 @@ import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.DefaultIdealStateCalculator;
@@ -74,15 +74,16 @@ public class TestAutoIsWithEmptyMap extends ZkIntegrationTestBase {
     _gZkClient.writeData(idealPath, curIdealState);
 
     // start controller
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -92,6 +93,7 @@ public class TestAutoIsWithEmptyMap extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index 1943364..b4f9223 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -29,11 +29,10 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
@@ -51,18 +50,17 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
   String db2 = TEST_DB + "2";
   String _tag = "SSDSSD";
 
+  @Override
   @BeforeClass
   public void beforeClass() throws Exception {
     // Logger.getRootLogger().setLevel(Level.INFO);
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
     String namespace = "/" + CLUSTER_NAME;
-    if (_zkClient.exists(namespace)) {
-      _zkClient.deleteRecursive(namespace);
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
     }
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
 
     // setup storage cluster
     _setupTool.addCluster(CLUSTER_NAME, true);
@@ -89,24 +87,21 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
     // start dummy participants
     for (int i = 0; i < NODE_NR; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      if (_startCMResultMap.get(instanceName) != null) {
-        LOG.error("fail to start particpant:" + instanceName
-            + "(participant with same name already exists)");
-      } else {
-        StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
-        _startCMResultMap.put(instanceName, result);
-      }
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      participant.syncStart();
+      _participants[i] = participant;
+
     }
 
     // start controller
     String controllerName = CONTROLLER_PREFIX + "_0";
-    StartCMResult startResult =
-        TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
-            HelixControllerMain.STANDALONE);
-    _startCMResultMap.put(controllerName, startResult);
+    _controller =
+        new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
 
     boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
 
     Assert.assertTrue(result);
@@ -122,7 +117,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", 1);
 
     boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, "MyDB"));
     Assert.assertTrue(result);
 
@@ -140,7 +135,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB2", 3);
 
     result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, "MyDB2"));
     Assert.assertTrue(result);
 
@@ -154,16 +149,11 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
 
   @Test()
   public void testAutoRebalance() throws Exception {
-
     // kill 1 node
-    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    _startCMResultMap.get(instanceName)._manager.disconnect();
-    Thread.currentThread().sleep(1000);
-    _startCMResultMap.get(instanceName)._thread.interrupt();
+    _participants[0].syncStop();
 
-    // verifyBalanceExternalView();
     boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
     Assert.assertTrue(result);
 
@@ -172,22 +162,22 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i);
       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
 
-      StartCMResult resultx =
-          TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
-      _startCMResultMap.put(storageNodeName, resultx);
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
+      participant.syncStart();
     }
     Thread.sleep(1000);
     result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
     Assert.assertTrue(result);
 
     result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, db2));
     Assert.assertTrue(result);
     HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
     ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
     Set<String> instancesSet = new HashSet<String>();
@@ -242,12 +232,10 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
   }
 
   public static class ExternalViewBalancedVerifier implements ZkVerifier {
-    ZkClient _client;
     String _clusterName;
     String _resourceName;
 
     public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) {
-      _client = client;
       _clusterName = clusterName;
       _resourceName = resourceName;
     }
@@ -255,7 +243,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
     @Override
     public boolean verify() {
       HelixDataAccessor accessor =
-          new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client));
+          new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
       Builder keyBuilder = accessor.keyBuilder();
       int numberOfPartitions =
           accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields()
@@ -286,7 +274,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
 
     @Override
     public ZkClient getZkClient() {
-      return _client;
+      return _gZkClient;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
index 32cafcf..74a5699 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@ -25,14 +25,12 @@ import java.util.Map;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
@@ -49,16 +47,15 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
   private static final Logger LOG = Logger.getLogger(TestAutoRebalancePartitionLimit.class
       .getName());
 
+  @Override
   @BeforeClass
   public void beforeClass() throws Exception {
     // Logger.getRootLogger().setLevel(Level.INFO);
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
     String namespace = "/" + CLUSTER_NAME;
-    if (_zkClient.exists(namespace)) {
-      _zkClient.deleteRecursive(namespace);
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
     }
     _setupTool = new ClusterSetup(ZK_ADDR);
 
@@ -75,40 +72,33 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
 
     // start controller
     String controllerName = CONTROLLER_PREFIX + "_0";
-    StartCMResult startResult =
-        TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
-            HelixControllerMain.STANDALONE);
-    _startCMResultMap.put(controllerName, startResult);
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
 
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixManager manager = _controller; // _startCMResultMap.get(controllerName)._manager;
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     // start dummy participants
     for (int i = 0; i < NODE_NR; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      if (_startCMResultMap.get(instanceName) != null) {
-        LOG.error("fail to start particpant:" + instanceName
-            + "(participant with same name already exists)");
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
+      Thread.sleep(2000);
+      boolean result =
+          ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
+              CLUSTER_NAME, TEST_DB));
+      Assert.assertTrue(result);
+      ExternalView ev =
+          manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+      System.out.println(ev.getPartitionSet().size());
+      if (i < 3) {
+        Assert.assertEquals(ev.getPartitionSet().size(), 25 * (i + 1));
       } else {
-        startResult = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
-        _startCMResultMap.put(instanceName, startResult);
-        Thread.sleep(2000);
-        boolean result =
-            ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
-                CLUSTER_NAME, TEST_DB));
-        Assert.assertTrue(result);
-        ExternalView ev =
-            manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
-        System.out.println(ev.getPartitionSet().size());
-        if (i < 3) {
-          Assert.assertEquals(ev.getPartitionSet().size(), 25 * (i + 1));
-        } else {
-          Assert.assertEquals(ev.getPartitionSet().size(), 100);
-        }
+        Assert.assertEquals(ev.getPartitionSet().size(), 100);
       }
     }
 
     boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
 
     Assert.assertTrue(result);
@@ -116,17 +106,13 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
 
   @Test()
   public void testAutoRebalanceWithMaxPartitionPerNode() throws Exception {
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixManager manager = _controller;
     // kill 1 node
-    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    _startCMResultMap.get(instanceName)._manager.disconnect();
-    Thread.currentThread().sleep(1000);
-    _startCMResultMap.get(instanceName)._thread.interrupt();
+    _participants[0].syncStop();
 
     // verifyBalanceExternalView();
     boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
     Assert.assertTrue(result);
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
@@ -134,14 +120,11 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
         manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
     Assert.assertEquals(ev.getPartitionSet().size(), 100);
 
-    instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1);
-    _startCMResultMap.get(instanceName)._manager.disconnect();
-    Thread.currentThread().sleep(1000);
-    _startCMResultMap.get(instanceName)._thread.interrupt();
+    _participants[1].syncStop();
 
     // verifyBalanceExternalView();
     result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
     Assert.assertTrue(result);
     ev = manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
@@ -152,13 +135,15 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i);
       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
 
-      StartCMResult resultx =
-          TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
-      _startCMResultMap.put(storageNodeName, resultx);
+      String newInstanceName = storageNodeName.replace(':', '_');
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newInstanceName);
+      participant.syncStart();
     }
+
     Thread.sleep(1000);
     result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
     Assert.assertTrue(result);
   }
@@ -209,12 +194,10 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
   }
 
   public static class ExternalViewBalancedVerifier implements ZkVerifier {
-    ZkClient _client;
     String _clusterName;
     String _resourceName;
 
     public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) {
-      _client = client;
       _clusterName = clusterName;
       _resourceName = resourceName;
     }
@@ -222,7 +205,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
     @Override
     public boolean verify() {
       HelixDataAccessor accessor =
-          new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client));
+          new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
       Builder keyBuilder = accessor.keyBuilder();
       int numberOfPartitions =
           accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields()
@@ -240,7 +223,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
 
     @Override
     public ZkClient getZkClient() {
-      return _client;
+      return _gZkClient;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
index bf2de1e..03fc85b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java
@@ -30,13 +30,10 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.HelixProperty.HelixPropertyAttribute;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZkTestHelper;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.ErrTransition;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
@@ -91,15 +88,16 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     TestZkChildListener listener = new TestZkChildListener();
     _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -112,7 +110,6 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
 
     // clean up
     // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
@@ -141,15 +138,16 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -159,7 +157,6 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // stop all participants
-    Thread.sleep(1000);
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }
@@ -180,7 +177,7 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -193,7 +190,6 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
 
     // clean up
     // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
@@ -209,10 +205,9 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     String clusterName = className + "_" + methodName;
 
     final int n = 5;
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-    // ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, // resource#
         6, // partition#
@@ -228,19 +223,20 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     idealState.setBatchMessageMode(true);
     accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 1) {
         Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
         errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
-        participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR,
-                new ErrTransition(errPartitions));
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -297,7 +293,8 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     TestZkChildListener listener = new TestZkChildListener();
     _gZkClient.subscribeChildChanges(keyBuilder.messages("localhost_12918").getPath(), listener);
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // pause controller
@@ -307,11 +304,11 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
     });
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -335,7 +332,6 @@ public class TestBatchMessage extends ZkIntegrationTestBase {
 
     // clean up
     // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
index 2ae8bf3..9bfdfb5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageWrapper.java
@@ -26,12 +26,12 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.messaging.handling.BatchMessageWrapper;
-import org.apache.helix.mock.controller.ClusterController;
 import org.apache.helix.mock.participant.MockMSModelFactory;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -90,17 +90,19 @@ public class TestBatchMessageWrapper extends ZkUnitTestBase {
     idealState.setBatchMessageMode(true);
     accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     TestMockMSModelFactory[] ftys = new TestMockMSModelFactory[n];
 
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
       ftys[i] = new TestMockMSModelFactory();
-      participants[i] = new MockParticipant(ftys[i], clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", ftys[i]);
       participants[i].syncStart();
 
       // wait for each participant to complete state transitions, so we have deterministic results
@@ -133,6 +135,12 @@ public class TestBatchMessageWrapper extends ZkUnitTestBase {
     Assert.assertEquals(wrapper._startCount, 2,
         "Expect 2 batch.end: O->S and S->M for 2nd participant");
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
index 8e75537..207a318 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java
@@ -24,10 +24,10 @@ import java.util.Date;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -45,7 +45,7 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
     // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
@@ -66,13 +66,15 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
     idealState.setBucketSize(1);
     accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
 
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -87,6 +89,7 @@ public class TestBucketizedResource extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }