You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/11/20 22:12:21 UTC

[06/52] [abbrv] [HELIX-279] Apply gc handling fixes to ZKHelixManager

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/controller/strategy/TestShufflingTwoStateStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestShufflingTwoStateStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestShufflingTwoStateStrategy.java
index 0269764..aea9b70 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestShufflingTwoStateStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestShufflingTwoStateStrategy.java
@@ -19,7 +19,6 @@ package org.apache.helix.controller.strategy;
  * under the License.
  */
 
-import java.io.IOException;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -29,8 +28,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.ZNRecord;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
@@ -68,44 +65,33 @@ public class TestShufflingTwoStateStrategy {
 
     // ByteArrayOutputStream baos = new ByteArrayOutputStream();
     StringWriter sw = new StringWriter();
-    try {
-      mapper.writeValue(sw, result);
-      // System.out.println(sw.toString());
-
-      ZNRecord zn = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
-      System.out.println(result.toString());
-      System.out.println(zn.toString());
-      AssertJUnit.assertTrue(zn.toString().equalsIgnoreCase(result.toString()));
-      System.out.println();
-
-      sw = new StringWriter();
-      mapper.writeValue(sw, result2);
-
-      ZNRecord zn2 = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
-      System.out.println(result2.toString());
-      System.out.println(zn2.toString());
-      AssertJUnit.assertTrue(zn2.toString().equalsIgnoreCase(result2.toString()));
-
-      sw = new StringWriter();
-      mapper.writeValue(sw, result3);
-      System.out.println();
-
-      ZNRecord zn3 = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
-      System.out.println(result3.toString());
-      System.out.println(zn3.toString());
-      AssertJUnit.assertTrue(zn3.toString().equalsIgnoreCase(result3.toString()));
-      System.out.println();
-
-    } catch (JsonGenerationException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (JsonMappingException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
+    mapper.writeValue(sw, result);
+    // System.out.println(sw.toString());
+
+    ZNRecord zn = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+    System.out.println(result.toString());
+    System.out.println(zn.toString());
+    AssertJUnit.assertTrue(zn.toString().equalsIgnoreCase(result.toString()));
+    System.out.println();
+
+    sw = new StringWriter();
+    mapper.writeValue(sw, result2);
+
+    ZNRecord zn2 = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+    System.out.println(result2.toString());
+    System.out.println(zn2.toString());
+    AssertJUnit.assertTrue(zn2.toString().equalsIgnoreCase(result2.toString()));
+
+    sw = new StringWriter();
+    mapper.writeValue(sw, result3);
+    System.out.println();
+
+    ZNRecord zn3 = mapper.readValue(new StringReader(sw.toString()), ZNRecord.class);
+    System.out.println(result3.toString());
+    System.out.println(zn3.toString());
+    AssertJUnit.assertTrue(zn3.toString().equalsIgnoreCase(result3.toString()));
+    System.out.println();
+
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
index e0e1544..cadbdc7 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
@@ -28,17 +28,15 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
@@ -49,7 +47,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestAddDropAlert extends ZkIntegrationTestBase {
-  ZkClient _zkClient;
   protected ClusterSetup _setupTool = null;
   protected final String _alertStr =
       "EXP(accumulate()(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -58,15 +55,11 @@ public class TestAddDropAlert extends ZkIntegrationTestBase {
 
   @BeforeClass()
   public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
   }
 
   public class AddDropAlertTransition extends MockTransition {
@@ -114,7 +107,7 @@ public class TestAddDropAlert extends ZkIntegrationTestBase {
   @Test()
   public void testAddDropAlert() throws Exception {
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START TestAddDropAlert at " + new Date(System.currentTimeMillis()));
 
@@ -130,18 +123,18 @@ public class TestAddDropAlert extends ZkIntegrationTestBase {
 
     _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
 
-    StartCMResult cmResult =
-        TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
-            HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     // start participants
     for (int i = 0; i < 5; i++) // !!!change back to 5
     {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new AddDropAlertTransition());
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new AddDropAlertTransition());
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     boolean result =
@@ -152,10 +145,10 @@ public class TestAddDropAlert extends ZkIntegrationTestBase {
     // drop alert soon after adding, but leave enough time for alert to fire once
     // Thread.sleep(3000);
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
     String instance = "localhost_12918";
     ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
     Map<String, Map<String, String>> recMap = record.getMapFields();
@@ -163,7 +156,7 @@ public class TestAddDropAlert extends ZkIntegrationTestBase {
     Assert.assertTrue(keySet.size() > 0);
 
     _setupTool.getClusterManagementTool().dropAlert(clusterName, _alertStr);
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
     // other verifications go here
     // for (int i = 0; i < 1; i++) //change 1 back to 5
     // {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
index f1d2ba6..37f8205 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
@@ -38,10 +38,13 @@ import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+  private static Logger LOG = Logger.getLogger(TestAlertActionTriggering.class);
+
   String _statName = "TestStat@DB=db1";
   String _stat = "TestStat";
   String metricName1 = "TestMetric1";
@@ -49,8 +52,7 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
 
   void setHealthData(int[] val1, int[] val2) {
     for (int i = 0; i < NODE_NR; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+      HelixManager manager = _participants[i];
       ZNRecord record = new ZNRecord(_stat);
       Map<String, String> valMap = new HashMap<String, String>();
       valMap.put(metricName1, val1[i] + "");
@@ -66,15 +68,13 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
     try {
       Thread.sleep(1000);
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("sleep interrupted", e);
     }
   }
 
   void setHealthData2(int[] val1) {
     for (int i = 0; i < NODE_NR; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+      HelixManager manager = _participants[i];
       ZNRecord record = new ZNRecord(_stat);
       Map<String, String> valMap = new HashMap<String, String>();
       valMap.put(metricName2, val1[i] + "");
@@ -89,8 +89,7 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
     try {
       Thread.sleep(1000);
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("sleep interrupted", e);
     }
   }
 
@@ -125,11 +124,9 @@ public class TestAlertActionTriggering extends ZkStandAloneCMTestBaseWithPropert
     };
     setHealthData(metrics1, metrics2);
 
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixManager manager = _controller;
 
-    HealthStatsAggregator task =
-        new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+    HealthStatsAggregator task = new HealthStatsAggregator(manager);
     task.aggregate();
     Thread.sleep(4000);
     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
index 24595d0..8618b1c 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
@@ -27,14 +27,16 @@ import java.util.TreeMap;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
-import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.HelixTimerTask;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.integration.ZkStandAloneCMTestBaseWithPropertyServerCheck;
 import org.apache.helix.model.AlertHistory;
 import org.apache.helix.model.HealthStat;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -44,6 +46,8 @@ import org.testng.annotations.Test;
  */
 
 public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
+  private final static Logger LOG = Logger.getLogger(TestAlertFireHistory.class);
+
   String _statName = "TestStat@DB=db1";
   String _stat = "TestStat";
   String metricName1 = "TestMetric1";
@@ -55,8 +59,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
 
   void setHealthData(int[] val1, int[] val2) {
     for (int i = 0; i < NODE_NR; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+      HelixManager manager = _participants[i];
       ZNRecord record = new ZNRecord(_stat);
       Map<String, String> valMap = new HashMap<String, String>();
       valMap.put(metricName1, val1[i] + "");
@@ -72,13 +75,12 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
     try {
       Thread.sleep(1000);
     } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Interrupted sleep", e);
     }
   }
 
   @Test
-  public void TestAlertDisable() throws InterruptedException {
+  public void testAlertDisable() throws InterruptedException {
 
     int[] metrics1 = {
         10, 15, 22, 24, 16
@@ -88,29 +90,27 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
     };
     setHealthData(metrics1, metrics2);
 
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixManager manager = _controller;
     manager.startTimerTasks();
 
     _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
     _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
 
-    // ConfigScope scope = new ConfigScopeBuilder().forCluster(CLUSTER_NAME).build();
     HelixConfigScope scope =
         new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(CLUSTER_NAME).build();
     Map<String, String> properties = new HashMap<String, String>();
     properties.put("healthChange.enabled", "false");
     _setupTool.getClusterManagementTool().setConfig(scope, properties);
 
-    HealthStatsAggregator task =
-        new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+    HealthStatsAggregator task = new HealthStatsAggregator(_controller);
+
     task.aggregate();
     Thread.sleep(100);
     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
     Builder keyBuilder = helixDataAccessor.keyBuilder();
 
     AlertHistory history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
-    //
+
     Assert.assertEquals(history, null);
 
     properties.put("healthChange.enabled", "true");
@@ -126,7 +126,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
   }
 
   @Test
-  public void TestAlertHistory() throws InterruptedException {
+  public void testAlertHistory() throws InterruptedException {
     int[] metrics1 = {
         10, 15, 22, 24, 16
     };
@@ -135,9 +135,10 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
     };
     setHealthData(metrics1, metrics2);
 
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
-    manager.stopTimerTasks();
+    HelixManager manager = _controller;
+    for (HelixTimerTask task : _controller.getControllerTimerTasks()) {
+      task.stop();
+    }
 
     _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr1);
     _setupTool.getClusterManagementTool().addAlert(CLUSTER_NAME, _alertStr2);
@@ -152,8 +153,8 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
       historySize = property.getRecord().getMapFields().size();
     }
 
-    HealthStatsAggregator task =
-        new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+    HealthStatsAggregator task = new HealthStatsAggregator(_controller);
+
     task.aggregate();
     Thread.sleep(100);
 
@@ -419,3 +420,4 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
   }
 
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
index efbd3b4..29f4893 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestDummyAlerts.java
@@ -29,10 +29,10 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.HealthStat;
 import org.apache.helix.model.Message;
@@ -73,7 +73,7 @@ public class TestDummyAlerts extends ZkIntegrationTestBase {
     String clusterName = className + "_" + methodName;
     final int n = 5;
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
@@ -95,15 +95,16 @@ public class TestDummyAlerts extends ZkIntegrationTestBase {
             "EXP(decay(1.0)(*.defaultPerfCounters@defaultPerfCounters.availableCPUs))CMP(GREATER)CON(2)");
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new DummyAlertsTransition());
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new DummyAlertsTransition());
       participants[i].syncStart();
     }
 
@@ -137,7 +138,6 @@ public class TestDummyAlerts extends ZkIntegrationTestBase {
     }
 
     // clean up
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
index f429b5f..23741c3 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
@@ -28,18 +28,16 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.alerts.AlertValueAndStatus;
 import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
@@ -50,7 +48,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestExpandAlert extends ZkIntegrationTestBase {
-  ZkClient _zkClient;
   protected ClusterSetup _setupTool = null;
   protected final String _alertStr =
       "EXP(decay(1.0)(localhost_*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(16)";
@@ -60,15 +57,12 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
 
   @BeforeClass()
   public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
 
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
   }
 
   public class ExpandAlertTransition extends MockTransition {
@@ -119,7 +113,7 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
   @Test()
   public void testExpandAlert() throws Exception {
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START TestExpandAlert at " + new Date(System.currentTimeMillis()));
 
@@ -135,18 +129,19 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
 
     _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
 
-    StartCMResult cmResult =
-        TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
-            HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     // start participants
     for (int i = 0; i < 5; i++) // !!!change back to 5
     {
       String instanceName = "localhost_" + (12918 + i);
 
       participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new ExpandAlertTransition());
-      participants[i].start();
-      // new Thread(participants[i]).start();
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new ExpandAlertTransition());
+      participants[i].syncStart();
     }
 
     boolean result =
@@ -157,13 +152,14 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
     Thread.sleep(1000);
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    // new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
     // sleep for a few seconds to give stats stage time to trigger
     Thread.sleep(3000);
 
     // other verifications go here
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     // for (int i = 0; i < 1; i++) //change 1 back to 5
@@ -180,6 +176,12 @@ public class TestExpandAlert extends ZkIntegrationTestBase {
     Assert.assertFalse(fired);
     // }
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+
+    }
     System.out.println("END TestExpandAlert at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
index 6d33df0..dbbd7aa 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
@@ -28,18 +28,16 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.alerts.AlertValueAndStatus;
 import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
@@ -50,7 +48,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestSimpleAlert extends ZkIntegrationTestBase {
-  ZkClient _zkClient;
   protected ClusterSetup _setupTool = null;
   protected final String _alertStr =
       "EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -59,15 +56,11 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
 
   @BeforeClass()
   public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
   }
 
   public class SimpleAlertTransition extends MockTransition {
@@ -124,7 +117,7 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
   @Test()
   public void testSimpleAlert() throws Exception {
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START TestSimpleAlert at " + new Date(System.currentTimeMillis()));
 
@@ -139,10 +132,11 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
 
     // enableHealthCheck(clusterName);
 
-    StartCMResult cmResult =
-        TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
-            HelixControllerMain.STANDALONE);
-    cmResult._manager.startTimerTasks();
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+    controller.startTimerTasks();
+
     _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
     // start participants
     for (int i = 0; i < 5; i++) // !!!change back to 5
@@ -150,9 +144,9 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
       String instanceName = "localhost_" + (12918 + i);
 
       participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new SimpleAlertTransition(15));
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new SimpleAlertTransition(15));
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     boolean result =
@@ -162,13 +156,14 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
 
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    // new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
     // sleep for a few seconds to give stats stage time to trigger
     Thread.sleep(3000);
 
     // other verifications go here
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
     // for (int i = 0; i < 1; i++) //change 1 back to 5
     // {
@@ -196,6 +191,11 @@ public class TestSimpleAlert extends ZkIntegrationTestBase {
             .equals("ON"));
     // }
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
     System.out.println("END TestSimpleAlert at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
index 3eb31ed..90223ad 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
@@ -27,29 +27,29 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.alerts.AlertValueAndStatus;
 import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
-  ZkClient _zkClient;
+  private static Logger LOG = Logger.getLogger(TestSimpleWildcardAlert.class);
+
   protected ClusterSetup _setupTool = null;
   protected final String _alertStr =
       "EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -58,15 +58,12 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
 
   @BeforeClass()
   public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
 
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
   }
 
   public class SimpleAlertTransition extends MockTransition {
@@ -123,7 +120,7 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
   @Test()
   public void testSimpleWildcardAlert() throws Exception {
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START testSimpleWildcardAlert at " + new Date(System.currentTimeMillis()));
 
@@ -138,10 +135,10 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
 
     // enableHealthCheck(clusterName);
 
-    StartCMResult cmResult =
-        TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
-            HelixControllerMain.STANDALONE);
-    cmResult._manager.stopTimerTasks();
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+    controller.stopTimerTasks();
 
     String alertwildcard =
         "EXP(decay(1.0)(localhost*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
@@ -153,9 +150,9 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
       String instanceName = "localhost_" + (12944 + i);
 
       participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new SimpleAlertTransition(i * 5));
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new SimpleAlertTransition(i * 5));
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     boolean result =
@@ -166,13 +163,13 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
     Thread.sleep(1000);
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
     // sleep for a few seconds to give stats stage time to trigger
     Thread.sleep(1000);
 
     // other verifications go here
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
     ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
     Map<String, Map<String, String>> recMap = record.getMapFields();
@@ -208,7 +205,7 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
     alertwildcard =
         "EXP(decay(1.0)(localhost*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(15)";
     _setupTool.getClusterManagementTool().addAlert(clusterName, alertwildcard);
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
     Thread.sleep(1000);
 
     record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
@@ -240,6 +237,11 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase {
       Assert.assertTrue(delta.get(alertString).equals("ON"));
     }
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
     System.out.println("END testSimpleWildcardAlert at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
index 2661560..76784d3 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
@@ -28,18 +28,16 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.alerts.AlertValueAndStatus;
 import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
@@ -50,7 +48,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestStalenessAlert extends ZkIntegrationTestBase {
-  ZkClient _zkClient;
   protected ClusterSetup _setupTool = null;
   protected final String _alertStr = "EXP(decay(1)(localhost_*.reportingage))CMP(GREATER)CON(600)";
   protected final String _alertStatusStr = _alertStr + " : (localhost_12918.reportingage)";
@@ -58,15 +55,12 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
 
   @BeforeClass()
   public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
 
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
   }
 
   public class StalenessAlertTransition extends MockTransition {
@@ -117,7 +111,7 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
   @Test()
   public void testStalenessAlert() throws Exception {
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START TestStalenessAlert at " + new Date(System.currentTimeMillis()));
 
@@ -133,18 +127,19 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
 
     _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
 
-    StartCMResult cmResult =
-        TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
-            HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     // start participants
     for (int i = 0; i < 5; i++) // !!!change back to 5
     {
       String instanceName = "localhost_" + (12918 + i);
 
       participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new StalenessAlertTransition());
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new StalenessAlertTransition());
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     boolean result =
@@ -154,13 +149,13 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
 
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
     // sleep for a few seconds to give stats stage time to trigger
     Thread.sleep(3000);
 
     // other verifications go here
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
     // for (int i = 0; i < 1; i++) //change 1 back to 5
     // {
@@ -176,6 +171,11 @@ public class TestStalenessAlert extends ZkIntegrationTestBase {
     // Assert.assertFalse(fired);
     // }
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
     System.out.println("END TestStalenessAlert at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
index 5265ebb..79c85ca 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
@@ -43,18 +43,16 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.alerts.AlertValueAndStatus;
 import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
@@ -125,7 +123,6 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
   }
 
   private static final Logger _logger = Logger.getLogger(TestWildcardAlert.class);
-  ZkClient _zkClient;
   protected ClusterSetup _setupTool = null;
   protected final String _alertStr =
       "EXP(decay(1)(localhost_*.RestQueryStats@DBName=TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)";
@@ -134,15 +131,12 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
 
   @BeforeClass()
   public void beforeClass() throws Exception {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
 
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
   }
 
   public class WildcardAlertTransition extends MockTransition {
@@ -208,7 +202,7 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
   @Test()
   public void testWildcardAlert() throws Exception {
     String clusterName = getShortClassName();
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     System.out.println("START TestWildcardAlert at " + new Date(System.currentTimeMillis()));
 
@@ -227,18 +221,18 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
     _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
     // _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr2);
 
-    StartCMResult cmResult =
-        TestHelper.startController(clusterName, "controller_0", ZK_ADDR,
-            HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
     // start participants
     for (int i = 0; i < 5; i++) // !!!change back to 5
     {
       String instanceName = "localhost_" + (12918 + i);
 
       participants[i] =
-          new MockParticipant(clusterName, instanceName, ZK_ADDR, new WildcardAlertTransition());
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new WildcardAlertTransition());
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     TestClusterMBeanObserver jmxMBeanObserver =
@@ -251,13 +245,13 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
     Thread.sleep(3000);
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregator(cmResult._manager).aggregate();
+    new HealthStatsAggregator(controller).aggregate();
 
     // sleep for a few seconds to give stats stage time to trigger and for bean to trigger
     Thread.sleep(3000);
 
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     // for (int i = 0; i < 1; i++) //change 1 back to 5
@@ -292,6 +286,12 @@ public class TestWildcardAlert extends ZkIntegrationTestBase {
             "EXP(decay(1)(localhost_%.RestQueryStats@DBName#TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)--(%)");
     // }
 
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < 5; i++) {
+      participants[i].syncStop();
+    }
+
     System.out.println("END TestWildcardAlert at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
index 82e45cc..32fdcff 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java
@@ -20,14 +20,9 @@ package org.apache.helix.integration;
  */
 
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
+
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
@@ -44,13 +39,15 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
   protected static final int START_PORT = 12918;
   protected static final String STATE_MODEL = "MasterSlave";
   protected ClusterSetup _setupTool = null;
-  protected Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
 
   protected final String CLASS_NAME = getShortClassName();
   protected final String CONTROLLER_CLUSTER = CONTROLLER_CLUSTER_PREFIX + "_" + CLASS_NAME;
 
   protected static final String TEST_DB = "TestDB";
 
+  MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
+  ClusterDistributedController[] _distControllers = new ClusterDistributedController[NODE_NR];
+
   @BeforeClass
   public void beforeClass() throws Exception {
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
@@ -88,29 +85,18 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
         "MasterSlave", 3, true);
 
     // start dummy participants for the first cluster
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < NODE_NR; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      if (_startCMResultMap.get(instanceName) != null) {
-        LOG.error("fail to start participant:" + instanceName
-            + "(participant with the same name already running");
-      } else {
-        StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, firstCluster, instanceName);
-        _startCMResultMap.put(instanceName, result);
-      }
+      _participants[i] = new MockParticipantManager(ZK_ADDR, firstCluster, instanceName);
+      _participants[i].syncStart();
     }
 
     // start distributed cluster controllers
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < NODE_NR; i++) {
       String controllerName = CONTROLLER_PREFIX + "_" + i;
-      if (_startCMResultMap.get(controllerName) != null) {
-        LOG.error("fail to start controller:" + controllerName
-            + "(controller with the same name already running");
-      } else {
-        StartCMResult result =
-            TestHelper.startController(CONTROLLER_CLUSTER, controllerName, ZK_ADDR,
-                HelixControllerMain.DISTRIBUTED);
-        _startCMResultMap.put(controllerName, result);
-      }
+      _distControllers[i] =
+          new ClusterDistributedController(ZK_ADDR, CONTROLLER_CLUSTER, controllerName);
+      _distControllers[i].syncStart();
     }
 
     verifyClusters();
@@ -132,36 +118,22 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase {
      * 3) disconnect leader/disconnect participant
      */
     String leader = getCurrentLeader(_gZkClient, CONTROLLER_CLUSTER);
-    // pauseController(_startCMResultMap.get(leader)._manager.getDataAccessor());
-
-    StartCMResult result;
-
-    Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
-
-    while (it.hasNext()) {
-      String instanceName = it.next().getKey();
-      if (!instanceName.equals(leader) && instanceName.startsWith(CONTROLLER_PREFIX)) {
-        result = _startCMResultMap.get(instanceName);
-        result._manager.disconnect();
-        result._thread.interrupt();
-        it.remove();
+    int leaderIdx = -1;
+    for (int i = 0; i < NODE_NR; i++) {
+      if (!_distControllers[i].getInstanceName().equals(leader)) {
+        _distControllers[i].syncStop();
+        verifyClusters();
+      } else {
+        leaderIdx = i;
       }
-      verifyClusters();
     }
+    Assert.assertNotSame(leaderIdx, -1);
 
-    result = _startCMResultMap.remove(leader);
-    result._manager.disconnect();
-    result._thread.interrupt();
-
-    it = _startCMResultMap.entrySet().iterator();
-    while (it.hasNext()) {
-      String instanceName = it.next().getKey();
-      result = _startCMResultMap.get(instanceName);
-      result._manager.disconnect();
-      result._thread.interrupt();
-      it.remove();
-    }
+    _distControllers[leaderIdx].syncStop();
 
+    for (int i = 0; i < NODE_NR; i++) {
+      _participants[i].syncStop();
+    }
     System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
index d1014ed..79d8b89 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java
@@ -22,15 +22,14 @@ package org.apache.helix.integration;
 import java.util.Date;
 import java.util.List;
 
-import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
@@ -50,16 +49,17 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, 20, nodeNr - 1,
         3, "MasterSlave", true);
 
-    MockParticipant[] participants = new MockParticipant[nodeNr];
+    MockParticipantManager[] participants = new MockParticipantManager[nodeNr];
     for (int i = 0; i < nodeNr - 1; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
-      new Thread(participants[i]).start();
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
     }
 
-    ZkHelixTestManager controller =
-        new ZkHelixTestManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
-    controller.connect();
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
     boolean result;
     result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
@@ -73,7 +73,7 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     _gSetupTool.addInstanceToCluster(clusterName, "localhost_12922");
     _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 3);
 
-    participants[nodeNr - 1] = new MockParticipant(clusterName, "localhost_12922", ZK_ADDR);
+    participants[nodeNr - 1] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12922");
     new Thread(participants[nodeNr - 1]).start();
     result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
@@ -84,11 +84,10 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
-    // controller.disconnect();
-    // for (int i = 0; i < nodeNr; i++)
-    // {
-    // participants[i].syncStop();
-    // }
+    controller.syncStop();
+    for (int i = 0; i < nodeNr; i++) {
+      participants[i].syncStop();
+    }
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
@@ -99,11 +98,13 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     // setup grand cluster
-    TestHelper.setupCluster("GRAND_" + clusterName, ZK_ADDR, 0, "controller", null, 0, 0, 1, 0,
+    final String grandClusterName = "GRAND_" + clusterName;
+    TestHelper.setupCluster(grandClusterName, ZK_ADDR, 0, "controller", null, 0, 0, 1, 0,
         null, true);
 
-    TestHelper.startController("GRAND_" + clusterName, "controller_0", ZK_ADDR,
-        HelixControllerMain.DISTRIBUTED);
+    ClusterDistributedController distController =
+        new ClusterDistributedController(ZK_ADDR, grandClusterName, "controller_0");
+    distController.syncStart();
 
     // setup cluster
     _gSetupTool.addCluster(clusterName, true);
@@ -125,12 +126,11 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     _gSetupTool.addResourceToCluster(clusterName, "TestDB0", 1, "LeaderStandby");
     _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 1);
 
-    MockParticipant[] participants = new MockParticipant[nodeNr];
+    MockParticipantManager[] participants = new MockParticipantManager[nodeNr];
     for (int i = 0; i < nodeNr - 1; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
-      // new Thread(participants[i]).start();
     }
 
     result =
@@ -148,10 +148,8 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     _gSetupTool.addInstanceToCluster(clusterName, "localhost_12919");
     _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 2);
 
-    participants[nodeNr - 1] = new MockParticipant(clusterName, "localhost_12919", ZK_ADDR);
+    participants[nodeNr - 1] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12919");
     participants[nodeNr - 1].syncStart();
-    // new Thread(participants[nodeNr - 1]).start();
-
     result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
             ZK_ADDR, clusterName));
@@ -163,10 +161,10 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase {
     Assert.assertEquals(numberOfListeners, 2); // 1 of participant, and 1 of controller
 
     // clean up
-    // for (int i = 0; i < nodeNr; i++)
-    // {
-    // participants[i].syncStop();
-    // }
+    distController.syncStop();
+    for (int i = 0; i < nodeNr; i++) {
+      participants[i].syncStop();
+    }
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
index ba4eee2..123ce6e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java
@@ -28,9 +28,9 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.api.id.StateModelFactoryId;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
 import org.apache.helix.mock.participant.MockMSModelFactory;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
@@ -50,7 +50,7 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -61,14 +61,15 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
         3, // replicas
         "MasterSlave", true); // do rebalance
 
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -114,7 +115,7 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
     // register "TestDB1_Factory" state model factory
     // Logger.getRootLogger().setLevel(Level.INFO);
     for (int i = 0; i < n; i++) {
-      participants[i].getManager().getStateMachineEngine()
+      participants[i].getStateMachineEngine()
           .registerStateModelFactory("MasterSlave", new MockMSModelFactory(), "TestDB1_Factory");
     }
 
@@ -125,7 +126,6 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
 
     // clean up
     // wait for all zk callbacks done
-    Thread.sleep(1000);
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
index a008814..fe23cb9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoIsWithEmptyMap.java
@@ -27,9 +27,9 @@ import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -74,15 +74,16 @@ public class TestAutoIsWithEmptyMap extends ZkIntegrationTestBase {
     _gZkClient.writeData(idealPath, curIdealState);
 
     // start controller
-    TestHelper
-        .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
 
     // start participants
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -92,6 +93,7 @@ public class TestAutoIsWithEmptyMap extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
index 0e7f4fa..2aec114 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java
@@ -28,13 +28,13 @@ import java.util.Set;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.State;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
@@ -62,13 +62,11 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
     // Logger.getRootLogger().setLevel(Level.INFO);
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
     String namespace = "/" + CLUSTER_NAME;
-    if (_zkClient.exists(namespace)) {
-      _zkClient.deleteRecursive(namespace);
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
     }
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
 
     // setup storage cluster
     _setupTool.addCluster(CLUSTER_NAME, true);
@@ -95,24 +93,21 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
     // start dummy participants
     for (int i = 0; i < NODE_NR; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      if (_startCMResultMap.get(instanceName) != null) {
-        LOG.error("fail to start particpant:" + instanceName
-            + "(participant with same name already exists)");
-      } else {
-        StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
-        _startCMResultMap.put(instanceName, result);
-      }
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      participant.syncStart();
+      _participants[i] = participant;
+
     }
 
     // start controller
     String controllerName = CONTROLLER_PREFIX + "_0";
-    StartCMResult startResult =
-        TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
-            HelixControllerMain.STANDALONE);
-    _startCMResultMap.put(controllerName, startResult);
+    _controller =
+        new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
 
     boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
 
     Assert.assertTrue(result);
@@ -128,7 +123,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", 1);
 
     boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, "MyDB"));
     Assert.assertTrue(result);
 
@@ -146,7 +141,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB2", 3);
 
     result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, "MyDB2"));
     Assert.assertTrue(result);
 
@@ -160,16 +155,11 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
 
   @Test()
   public void testAutoRebalance() throws Exception {
-
     // kill 1 node
-    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    _startCMResultMap.get(instanceName)._manager.disconnect();
-    Thread.sleep(1000);
-    _startCMResultMap.get(instanceName)._thread.interrupt();
+    _participants[0].syncStop();
 
-    // verifyBalanceExternalView();
     boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
     Assert.assertTrue(result);
 
@@ -178,22 +168,22 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i);
       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
 
-      StartCMResult resultx =
-          TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
-      _startCMResultMap.put(storageNodeName, resultx);
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
+      participant.syncStart();
     }
     Thread.sleep(5000);
     result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
     Assert.assertTrue(result);
 
     result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, db2));
     Assert.assertTrue(result);
     HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
     ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
     Set<String> instancesSet = new HashSet<String>();
@@ -246,12 +236,10 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
   }
 
   public static class ExternalViewBalancedVerifier implements ZkVerifier {
-    ZkClient _client;
     String _clusterName;
     String _resourceName;
 
     public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) {
-      _client = client;
       _clusterName = clusterName;
       _resourceName = resourceName;
     }
@@ -259,7 +247,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
     @Override
     public boolean verify() {
       HelixDataAccessor accessor =
-          new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
+          new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
       Builder keyBuilder = accessor.keyBuilder();
       IdealState idealState = accessor.getProperty(keyBuilder.idealStates(_resourceName));
       if (idealState == null) {
@@ -298,7 +286,7 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC
 
     @Override
     public ZkClient getZkClient() {
-      return _client;
+      return _gZkClient;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
index 3523461..5f9f48c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java
@@ -27,12 +27,12 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.State;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
@@ -58,13 +58,11 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
     // Logger.getRootLogger().setLevel(Level.INFO);
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
     String namespace = "/" + CLUSTER_NAME;
-    if (_zkClient.exists(namespace)) {
-      _zkClient.deleteRecursive(namespace);
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
     }
-    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool = new ClusterSetup(_gZkClient);
 
     // setup storage cluster
     _setupTool.addCluster(CLUSTER_NAME, true);
@@ -79,40 +77,33 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
 
     // start controller
     String controllerName = CONTROLLER_PREFIX + "_0";
-    StartCMResult startResult =
-        TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
-            HelixControllerMain.STANDALONE);
-    _startCMResultMap.put(controllerName, startResult);
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
 
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixManager manager = _controller; // _startCMResultMap.get(controllerName)._manager;
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     // start dummy participants
     for (int i = 0; i < NODE_NR; i++) {
       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      if (_startCMResultMap.get(instanceName) != null) {
-        LOG.error("fail to start particpant:" + instanceName
-            + "(participant with same name already exists)");
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
+      Thread.sleep(2000);
+      boolean result =
+          ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
+              CLUSTER_NAME, TEST_DB));
+      Assert.assertTrue(result);
+      ExternalView ev =
+          manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+      System.out.println(ev.getPartitionSet().size());
+      if (i < 3) {
+        Assert.assertEquals(ev.getPartitionSet().size(), 25 * (i + 1));
       } else {
-        startResult = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
-        _startCMResultMap.put(instanceName, startResult);
-        Thread.sleep(2000);
-        boolean result =
-            ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
-                CLUSTER_NAME, TEST_DB));
-        Assert.assertTrue(result);
-        ExternalView ev =
-            manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
-        System.out.println(ev.getPartitionSet().size());
-        if (i < 3) {
-          Assert.assertEquals(ev.getPartitionSet().size(), 25 * (i + 1));
-        } else {
-          Assert.assertEquals(ev.getPartitionSet().size(), 100);
-        }
+        Assert.assertEquals(ev.getPartitionSet().size(), 100);
       }
     }
 
     boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
 
     Assert.assertTrue(result);
@@ -120,49 +111,50 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
 
   @Test()
   public void testAutoRebalanceWithMaxPartitionPerNode() throws Exception {
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixManager manager = _controller;
     // kill 1 node
-    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    _startCMResultMap.get(instanceName)._manager.disconnect();
-    Thread.sleep(1000);
-    _startCMResultMap.get(instanceName)._thread.interrupt();
+    _participants[0].syncStop();
 
     // verifyBalanceExternalView();
     boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
     Assert.assertTrue(result);
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    final HelixDataAccessor accessor = manager.getHelixDataAccessor();
     ExternalView ev =
         manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
     Assert.assertEquals(ev.getPartitionSet().size(), 100);
 
-    instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1);
-    _startCMResultMap.get(instanceName)._manager.disconnect();
-    Thread.sleep(1000);
-    _startCMResultMap.get(instanceName)._thread.interrupt();
+    _participants[1].syncStop();
 
     // verifyBalanceExternalView();
     result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
     Assert.assertTrue(result);
-    ev = manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
-    Assert.assertEquals(ev.getPartitionSet().size(), 75);
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        ExternalView ev = accessor.getProperty(accessor.keyBuilder().externalView(TEST_DB));
+        return ev.getPartitionSet().size() == 75;
+      }
+    }, 3 * 1000);
 
     // add 2 nodes
     for (int i = 0; i < 2; i++) {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i);
       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
 
-      StartCMResult resultx =
-          TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
-      _startCMResultMap.put(storageNodeName, resultx);
+      String newInstanceName = storageNodeName.replace(':', '_');
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newInstanceName);
+      participant.syncStart();
     }
-    Thread.sleep(1000);
+
+    // Thread.sleep(1000);
     result =
-        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
+        ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient,
             CLUSTER_NAME, TEST_DB));
     Assert.assertTrue(result);
   }
@@ -213,12 +205,10 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
   }
 
   public static class ExternalViewBalancedVerifier implements ZkVerifier {
-    ZkClient _client;
     String _clusterName;
     String _resourceName;
 
     public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) {
-      _client = client;
       _clusterName = clusterName;
       _resourceName = resourceName;
     }
@@ -226,7 +216,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
     @Override
     public boolean verify() {
       HelixDataAccessor accessor =
-          new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client));
+          new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
       Builder keyBuilder = accessor.keyBuilder();
       IdealState idealState = accessor.getProperty(keyBuilder.idealStates(_resourceName));
       int numberOfPartitions = idealState.getRecord().getListFields().size();
@@ -244,7 +234,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP
 
     @Override
     public ZkClient getZkClient() {
-      return _client;
+      return _gZkClient;
     }
 
     @Override