You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/11/20 22:12:24 UTC

[09/52] [abbrv] git commit: [HELIX-279] Apply gc handling fixes to ZKHelixManager

[HELIX-279] Apply gc handling fixes to ZKHelixManager


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/f8e3b1af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/f8e3b1af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/f8e3b1af

Branch: refs/heads/helix-yarn
Commit: f8e3b1af5c94b779d4244137131dbabc593ca0e8
Parents: 18a8c7c
Author: zzhang <zz...@apache.org>
Authored: Mon Nov 11 13:09:24 2013 -0800
Committer: zzhang <zz...@apache.org>
Committed: Mon Nov 11 13:09:24 2013 -0800

----------------------------------------------------------------------
 .../tools/TestHelixAdminScenariosRest.java      |  889 +++++++++------
 .../apache/helix/tools/TestResetInstance.java   |   21 +-
 .../helix/tools/TestResetPartitionState.java    |   24 +-
 .../apache/helix/tools/TestResetResource.java   |   22 +-
 .../org/apache/helix/agent/TestHelixAgent.java  |   27 +-
 .../java/org/apache/helix/HelixManager.java     |    7 +
 .../java/org/apache/helix/HelixProperty.java    |    6 +-
 .../rebalancer/SemiAutoRebalancer.java          |    3 +
 .../util/ConstraintBasedAssignment.java         |   55 +-
 .../stages/BestPossibleStateCalcStage.java      |   64 +-
 .../stages/CurrentStateComputationStage.java    |   13 +-
 .../stages/MessageGenerationStage.java          |    2 +-
 .../stages/MessageSelectionStage.java           |    9 +-
 .../controller/stages/MessageThrottleStage.java |   11 +-
 .../controller/stages/ResourceCurrentState.java |   13 +-
 .../controller/stages/TaskAssignmentStage.java  |   12 +-
 .../helix/manager/zk/AbstractManager.java       |  691 ------------
 .../helix/manager/zk/ControllerManager.java     |  174 ---
 .../manager/zk/ControllerManagerHelper.java     |    7 +-
 .../DefaultSchedulerMessageHandlerFactory.java  |    1 +
 .../zk/DistributedControllerManager.java        |  190 ----
 .../manager/zk/DistributedLeaderElection.java   |   16 +-
 .../manager/zk/HelixConnectionAdaptor.java      |    6 +
 .../helix/manager/zk/ParticipantManager.java    |  153 ---
 .../manager/zk/ParticipantManagerHelper.java    |   27 +-
 .../apache/helix/manager/zk/ZKHelixManager.java | 1041 +++++++++---------
 .../helix/manager/zk/ZkAsyncCallbacks.java      |    3 +-
 .../helix/manager/zk/ZkStateChangeListener.java |  127 ---
 .../messaging/DefaultMessagingService.java      |    4 +-
 .../apache/helix/model/ResourceAssignment.java  |   19 +-
 .../src/test/java/org/apache/helix/Mocks.java   |    6 +
 .../test/java/org/apache/helix/TestHelper.java  |   97 +-
 .../org/apache/helix/TestZkClientWrapper.java   |   45 +-
 .../java/org/apache/helix/TestZnodeModify.java  |    3 +-
 .../org/apache/helix/ZkHelixTestManager.java    |   44 -
 .../org/apache/helix/api/TestNewStages.java     |   12 +-
 .../controller/stages/DummyClusterManager.java  |    6 +
 .../stages/TestMessageThrottleStage.java        |   30 +-
 .../stages/TestParseInfoFromAlert.java          |    5 +-
 .../stages/TestRebalancePipeline.java           |   19 +-
 .../strategy/TestShufflingTwoStateStrategy.java |   68 +-
 .../helix/healthcheck/TestAddDropAlert.java     |   35 +-
 .../healthcheck/TestAlertActionTriggering.java  |   21 +-
 .../helix/healthcheck/TestAlertFireHistory.java |   38 +-
 .../helix/healthcheck/TestDummyAlerts.java      |   14 +-
 .../helix/healthcheck/TestExpandAlert.java      |   40 +-
 .../helix/healthcheck/TestSimpleAlert.java      |   40 +-
 .../healthcheck/TestSimpleWildcardAlert.java    |   42 +-
 .../helix/healthcheck/TestStalenessAlert.java   |   36 +-
 .../helix/healthcheck/TestWildcardAlert.java    |   36 +-
 .../helix/integration/TestAddClusterV2.java     |   78 +-
 .../TestAddNodeAfterControllerStart.java        |   56 +-
 .../TestAddStateModelFactoryAfterConnect.java   |   14 +-
 .../integration/TestAutoIsWithEmptyMap.java     |   14 +-
 .../helix/integration/TestAutoRebalance.java    |   64 +-
 .../TestAutoRebalancePartitionLimit.java        |  102 +-
 .../helix/integration/TestBatchMessage.java     |   50 +-
 .../integration/TestBatchMessageWrapper.java    |   18 +-
 .../integration/TestBucketizedResource.java     |   15 +-
 .../integration/TestCarryOverBadCurState.java   |   19 +-
 .../integration/TestCleanupExternalView.java    |   20 +-
 .../helix/integration/TestClusterStartsup.java  |   15 +-
 .../helix/integration/TestCustomIdealState.java |   12 -
 .../TestCustomizedIdealStateRebalancer.java     |    4 +-
 .../apache/helix/integration/TestDisable.java   |   36 +-
 .../helix/integration/TestDisableNode.java      |    2 +-
 .../helix/integration/TestDisablePartition.java |    2 +-
 .../integration/TestDistributedCMMain.java      |   18 +-
 .../TestDistributedClusterController.java       |   18 +-
 .../apache/helix/integration/TestDriver.java    |   80 +-
 .../org/apache/helix/integration/TestDrop.java  |  150 ++-
 .../helix/integration/TestDropResource.java     |    9 +-
 .../TestEnablePartitionDuringDisable.java       |   22 +-
 .../helix/integration/TestErrorPartition.java   |   28 +-
 .../integration/TestExternalViewUpdates.java    |   21 +-
 .../integration/TestHelixCustomCodeRunner.java  |   40 +-
 .../helix/integration/TestHelixInstanceTag.java |    3 +-
 .../helix/integration/TestInstanceAutoJoin.java |   25 +-
 .../integration/TestInvalidAutoIdealState.java  |   14 +-
 .../TestMessagePartitionStateMismatch.java      |    4 +-
 .../helix/integration/TestMessageThrottle.java  |   16 +-
 .../helix/integration/TestMessageThrottle2.java |    4 +-
 .../helix/integration/TestMessagingService.java |   81 +-
 .../integration/TestNonOfflineInitState.java    |   28 +-
 .../helix/integration/TestNullReplica.java      |   15 +-
 .../TestParticipantErrorMessage.java            |   14 +-
 .../TestParticipantNameCollision.java           |    9 +-
 .../helix/integration/TestPauseSignal.java      |   15 +-
 .../integration/TestRedefineStateModelDef.java  |   13 +-
 .../helix/integration/TestRenamePartition.java  |   45 +-
 .../helix/integration/TestResetInstance.java    |   18 +-
 .../integration/TestResetPartitionState.java    |   17 +-
 .../helix/integration/TestResetResource.java    |   18 +-
 .../integration/TestRestartParticipant.java     |   41 +-
 .../helix/integration/TestSchedulerMessage.java |  586 +++++++---
 .../integration/TestSchedulerMsgContraints.java |  254 -----
 .../integration/TestSchedulerMsgUsingQueue.java |  181 ---
 .../helix/integration/TestSchemataSM.java       |   14 +-
 .../TestSessionExpiryInTransition.java          |   29 +-
 .../helix/integration/TestStandAloneCMMain.java |   40 +-
 .../TestStandAloneCMSessionExpiry.java          |   22 +-
 ...estStartMultipleControllersWithSameName.java |    9 +-
 .../integration/TestStateTransitionTimeout.java |   38 +-
 .../helix/integration/TestSwapInstance.java     |   16 +-
 .../integration/TestZkCallbackHandlerLeak.java  |   89 +-
 .../integration/ZkIntegrationTestBase.java      |   49 +-
 .../integration/ZkStandAloneCMTestBase.java     |   63 +-
 ...dAloneCMTestBaseWithPropertyServerCheck.java |   25 +-
 .../manager/ClusterControllerManager.java       |   17 +-
 .../manager/ClusterDistributedController.java   |   20 +-
 .../manager/MockParticipantManager.java         |    8 +-
 .../TestDistributedControllerManager.java       |    9 +-
 .../manager/TestParticipantManager.java         |   26 +-
 .../apache/helix/josql/TestJosqlProcessor.java  |    8 +-
 .../TestDefaultControllerMsgHandlerFactory.java |    9 +-
 .../helix/manager/zk/TestHandleNewSession.java  |   16 +-
 .../manager/zk/TestLiveInstanceBounce.java      |   20 +-
 .../zk/TestZKPropertyTransferServer.java        |   21 +-
 .../helix/manager/zk/TestZkClusterManager.java  |   16 +-
 .../apache/helix/manager/zk/TestZkFlapping.java |   14 +-
 .../zk/TestZkManagerFlappingDetection.java      |  117 +-
 .../manager/zk/TestZkStateChangeListener.java   |   54 +-
 .../handling/TestConfigThreadpoolSize.java      |   10 +-
 .../handling/TestResourceThreadpoolSize.java    |    8 +-
 .../mock/controller/ClusterController.java      |  127 ---
 .../MockHealthReportParticipant.java            |   16 +-
 .../helix/mock/participant/MockParticipant.java |  181 ---
 .../helix/participant/MockZKHelixManager.java   |    6 +
 .../TestDistControllerStateModel.java           |   13 +-
 .../apache/helix/tools/TestHelixAdminCli.java   |  162 +--
 .../helix/userdefinedrebalancer/Lock.java       |    5 +
 131 files changed, 3175 insertions(+), 4684 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
index 04a70d2..6a0e331 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestHelixAdminScenariosRest.java
@@ -19,24 +19,19 @@ package org.apache.helix.tools;
  * under the License.
  */
 
-/*
- * Simulate all the admin tasks needed by using command line tool
- * 
- * */
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState.IdealStateProperty;
@@ -64,8 +59,10 @@ import org.restlet.representation.Representation;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+/**
+ * Simulate all the admin tasks needed by using command line tool
+ */
 public class TestHelixAdminScenariosRest extends AdminTestBase {
-  Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
   RestAdminApplication _adminApp;
   Component _component;
   String _tag1 = "tag1123";
@@ -90,55 +87,6 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
     return mapper.readValue(sr, clazz);
   }
 
-  @Test
-  public void testAddDeleteClusterAndInstanceAndResource() throws Exception {
-    // Helix bug helix-102
-    // ZKPropertyTransferServer.PERIOD = 500;
-    // ZkPropertyTransferClient.SEND_PERIOD = 500;
-    // ZKPropertyTransferServer.getInstance().init(19999, ZK_ADDR);
-
-    /** ======================= Add clusters ============================== */
-
-    testAddCluster();
-
-    /** ================= Add / drop some resources =========================== */
-
-    testAddResource();
-
-    /** ====================== Add / delete instances =========================== */
-
-    testAddInstance();
-
-    /** ===================== Rebalance resource =========================== */
-
-    testRebalanceResource();
-
-    /** ==================== start the clusters ============================= */
-
-    testStartCluster();
-
-    /** ==================== drop add resource in live clusters =================== */
-    testDropAddResource();
-
-    /** ======================Operations with live node ============================ */
-
-    testInstanceOperations();
-
-    /** ======================Operations with partitions ============================ */
-
-    testEnablePartitions();
-
-    /** ============================ expand cluster =========================== */
-
-    testExpandCluster();
-
-    /** ============================ deactivate cluster =========================== */
-    testDeactivateCluster();
-
-    // wait all zk callbacks done
-    Thread.sleep(1000);
-  }
-
   static String assertSuccessPostOperation(String url, Map<String, String> jsonParameters,
       boolean hasException) throws IOException {
     Reference resourceRef = new Reference(url);
@@ -226,41 +174,46 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
     Assert.assertTrue(exceptionThrown);
   }
 
+  private Map<String, String> addClusterCmd(String clusterName) {
+    Map<String, String> parameters = new HashMap<String, String>();
+    parameters.put(JsonParameters.CLUSTER_NAME, clusterName);
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addCluster);
+
+    return parameters;
+  }
+
+  private void addCluster(String clusterName) throws IOException {
+    String url = "http://localhost:" + ADMIN_PORT + "/clusters";
+    String response = assertSuccessPostOperation(url, addClusterCmd(clusterName), false);
+    Assert.assertTrue(response.contains(clusterName));
+  }
+
+  @Test
   public void testAddCluster() throws Exception {
     String url = "http://localhost:" + ADMIN_PORT + "/clusters";
-    Map<String, String> paraMap = new HashMap<String, String>();
 
     // Normal add
-    paraMap.put(JsonParameters.CLUSTER_NAME, "clusterTest");
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addCluster);
-
-    String response = assertSuccessPostOperation(url, paraMap, false);
+    String response = assertSuccessPostOperation(url, addClusterCmd("clusterTest"), false);
     Assert.assertTrue(response.contains("clusterTest"));
 
     // malformed cluster name
-    paraMap.put(JsonParameters.CLUSTER_NAME, "/ClusterTest");
-    response = assertSuccessPostOperation(url, paraMap, true);
+    response = assertSuccessPostOperation(url, addClusterCmd("/ClusterTest"), true);
 
     // Add the grand cluster
-    paraMap.put(JsonParameters.CLUSTER_NAME, "Klazt3rz");
-    response = assertSuccessPostOperation(url, paraMap, false);
+    response = assertSuccessPostOperation(url, addClusterCmd("Klazt3rz"), false);
     Assert.assertTrue(response.contains("Klazt3rz"));
 
-    paraMap.put(JsonParameters.CLUSTER_NAME, "\\ClusterTest");
-    response = assertSuccessPostOperation(url, paraMap, false);
+    response = assertSuccessPostOperation(url, addClusterCmd("\\ClusterTest"), false);
     Assert.assertTrue(response.contains("\\ClusterTest"));
 
     // Add already exist cluster
-    paraMap.put(JsonParameters.CLUSTER_NAME, "clusterTest");
-    response = assertSuccessPostOperation(url, paraMap, true);
+    response = assertSuccessPostOperation(url, addClusterCmd("clusterTest"), true);
 
     // delete cluster without resource and instance
     Assert.assertTrue(ZKUtil.isClusterSetup("Klazt3rz", _gZkClient));
     Assert.assertTrue(ZKUtil.isClusterSetup("clusterTest", _gZkClient));
     Assert.assertTrue(ZKUtil.isClusterSetup("\\ClusterTest", _gZkClient));
 
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.dropCluster);
-
     String clusterUrl = getClusterUrl("\\ClusterTest");
     deleteUrl(clusterUrl, false);
 
@@ -284,94 +237,180 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
     Assert.assertFalse(_gZkClient.exists("/clusterTest1"));
     Assert.assertFalse(_gZkClient.exists("/clusterTestOK"));
 
-    paraMap.put(JsonParameters.CLUSTER_NAME, "clusterTest1");
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addCluster);
-    response = assertSuccessPostOperation(url, paraMap, false);
+    response = assertSuccessPostOperation(url, addClusterCmd("clusterTest1"), false);
     response = getUrl(clustersUrl);
     Assert.assertTrue(response.contains("clusterTest1"));
   }
 
-  public void testAddResource() throws Exception {
-    String reourcesUrl = "http://localhost:" + ADMIN_PORT + "/clusters/clusterTest1/resourceGroups";
+  private Map<String, String> addResourceCmd(String resourceName, String stateModelDef,
+      int partition) {
+    Map<String, String> parameters = new HashMap<String, String>();
 
-    Map<String, String> paraMap = new HashMap<String, String>();
-    paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_22");
-    paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "MasterSlave");
-    paraMap.put(JsonParameters.PARTITIONS, "144");
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addResource);
+    parameters.put(JsonParameters.RESOURCE_GROUP_NAME, resourceName);
+    parameters.put(JsonParameters.STATE_MODEL_DEF_REF, stateModelDef);
+    parameters.put(JsonParameters.PARTITIONS, "" + partition);
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addResource);
 
-    String response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
-    Assert.assertTrue(response.contains("db_22"));
+    return parameters;
+  }
+
+  private void addResource(String clusterName, String resourceName, int partitions)
+      throws IOException {
+    final String reourcesUrl =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups";
+    String response =
+        assertSuccessPostOperation(reourcesUrl,
+            addResourceCmd(resourceName, "MasterSlave", partitions), false);
+    Assert.assertTrue(response.contains(resourceName));
+  }
 
-    paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_11");
-    paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "MasterSlave");
-    paraMap.put(JsonParameters.PARTITIONS, "44");
+  @Test
+  public void testAddResource() throws Exception {
+    final String clusterName = "clusterTestAddResource";
+    addCluster(clusterName);
+
+    String reourcesUrl =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups";
+    String response =
+        assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_22", "MasterSlave", 144), false);
+    Assert.assertTrue(response.contains("db_22"));
 
-    response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
+    response =
+        assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_11", "MasterSlave", 44), false);
     Assert.assertTrue(response.contains("db_11"));
 
     // Add duplicate resource
-    paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_22");
-    paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "OnlineOffline");
-    paraMap.put(JsonParameters.PARTITIONS, "55");
-
-    response = assertSuccessPostOperation(reourcesUrl, paraMap, true);
+    response =
+        assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_22", "OnlineOffline", 55), true);
 
     // drop resource now
-    String resourceUrl = getResourceUrl("clusterTest1", "db_11");
+    String resourceUrl = getResourceUrl(clusterName, "db_11");
     deleteUrl(resourceUrl, false);
-    Assert.assertFalse(_gZkClient.exists("/clusterTest1/IDEALSTATES/db_11"));
+    Assert.assertFalse(_gZkClient.exists("/" + clusterName + "/IDEALSTATES/db_11"));
 
-    paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_11");
-    paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "MasterSlave");
-    paraMap.put(JsonParameters.PARTITIONS, "44");
-    response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
+    response =
+        assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_11", "MasterSlave", 44), false);
     Assert.assertTrue(response.contains("db_11"));
 
-    Assert.assertTrue(_gZkClient.exists("/clusterTest1/IDEALSTATES/db_11"));
+    Assert.assertTrue(_gZkClient.exists("/" + clusterName + "/IDEALSTATES/db_11"));
 
-    paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_33");
-    response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
+    response =
+        assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_33", "MasterSlave", 44), false);
     Assert.assertTrue(response.contains("db_33"));
 
-    paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_44");
-    response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
+    response =
+        assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_44", "MasterSlave", 44), false);
     Assert.assertTrue(response.contains("db_44"));
   }
 
-  private void testDeactivateCluster() throws Exception, InterruptedException {
-    HelixDataAccessor accessor;
-    String path;
-    // deactivate cluster
-    String clusterUrl = getClusterUrl("clusterTest1");
-    Map<String, String> paraMap = new HashMap<String, String>();
-    paraMap.put(JsonParameters.ENABLED, "false");
-    paraMap.put(JsonParameters.GRAND_CLUSTER, "Klazt3rz");
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.activateCluster);
+  private Map<String, String> activateClusterCmd(String grandClusterName, boolean enabled) {
+    Map<String, String> parameters = new HashMap<String, String>();
+    parameters.put(JsonParameters.GRAND_CLUSTER, grandClusterName);
+    parameters.put(JsonParameters.ENABLED, "" + enabled);
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.activateCluster);
 
-    String response = assertSuccessPostOperation(clusterUrl, paraMap, false);
+    return parameters;
+  }
+
+  @Test
+  public void testDeactivateCluster() throws Exception {
+    final String clusterName = "clusterTestDeactivateCluster";
+    final String controllerClusterName = "controllerClusterTestDeactivateCluster";
+
+    Map<String, MockParticipantManager> participants =
+        new HashMap<String, MockParticipantManager>();
+    Map<String, ClusterDistributedController> distControllers =
+        new HashMap<String, ClusterDistributedController>();
+
+    // setup cluster
+    addCluster(clusterName);
+    addInstancesToCluster(clusterName, "localhost:123", 6, null);
+    addResource(clusterName, "db_11", 16);
+    rebalanceResource(clusterName, "db_11");
+
+    addCluster(controllerClusterName);
+    addInstancesToCluster(controllerClusterName, "controller_900", 2, null);
+
+    // start mock nodes
+    for (int i = 0; i < 6; i++) {
+      String instanceName = "localhost_123" + i;
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participant.syncStart();
+      participants.put(instanceName, participant);
+    }
+
+    // start controller nodes
+    for (int i = 0; i < 2; i++) {
+      String controllerName = "controller_900" + i;
+      ClusterDistributedController distController =
+          new ClusterDistributedController(ZK_ADDR, controllerClusterName, controllerName);
+      distController.syncStart();
+      distControllers.put(controllerName, distController);
+    }
+
+    String clusterUrl = getClusterUrl(clusterName);
+
+    // activate cluster
+    assertSuccessPostOperation(clusterUrl, activateClusterCmd(controllerClusterName, true), false);
+    boolean verifyResult =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            controllerClusterName));
+    Assert.assertTrue(verifyResult);
+
+    verifyResult =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(verifyResult);
+
+    // deactivate cluster
+    assertSuccessPostOperation(clusterUrl, activateClusterCmd(controllerClusterName, false), false);
     Thread.sleep(6000);
-    Assert.assertFalse(_gZkClient.exists("/Klazt3rz/IDEALSTATES/clusterTest1"));
+    Assert.assertFalse(_gZkClient.exists("/" + controllerClusterName + "/IDEALSTATES/"
+        + clusterName));
 
-    accessor = _startCMResultMap.get("localhost_1231")._manager.getHelixDataAccessor();
-    path = accessor.keyBuilder().controllerLeader().getPath();
+    HelixDataAccessor accessor = participants.get("localhost_1231").getHelixDataAccessor();
+    String path = accessor.keyBuilder().controllerLeader().getPath();
     Assert.assertFalse(_gZkClient.exists(path));
 
     deleteUrl(clusterUrl, true);
+    Assert.assertTrue(_gZkClient.exists("/" + clusterName));
 
-    Assert.assertTrue(_gZkClient.exists("/clusterTest1"));
     // leader node should be gone
-    for (StartCMResult result : _startCMResultMap.values()) {
-      result._manager.disconnect();
-      result._thread.interrupt();
+    for (MockParticipantManager participant : participants.values()) {
+      participant.syncStop();
     }
     deleteUrl(clusterUrl, false);
 
-    Assert.assertFalse(_gZkClient.exists("/clusterTest1"));
+    Assert.assertFalse(_gZkClient.exists("/" + clusterName));
+
+    // clean up
+    for (ClusterDistributedController controller : distControllers.values()) {
+      controller.syncStop();
+    }
+
+    for (MockParticipantManager participant : participants.values()) {
+      participant.syncStop();
+    }
+  }
+
+  private Map<String, String> addIdealStateCmd() {
+    Map<String, String> parameters = new HashMap<String, String>();
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addIdealState);
+
+    return parameters;
   }
 
-  private void testDropAddResource() throws Exception {
-    ZNRecord record = _gSetupTool._admin.getResourceIdealState("clusterTest1", "db_11").getRecord();
+  @Test
+  public void testDropAddResource() throws Exception {
+    final String clusterName = "clusterTestDropAddResource";
+
+    // setup cluster
+    addCluster(clusterName);
+    addResource(clusterName, "db_11", 22);
+    addInstancesToCluster(clusterName, "localhost_123", 6, null);
+    rebalanceResource(clusterName, "db_11");
+    ZNRecord record = _gSetupTool._admin.getResourceIdealState(clusterName, "db_11").getRecord();
     String x = ObjectToJson(record);
 
     FileWriter fos = new FileWriter("/tmp/temp.log");
@@ -379,217 +418,370 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
     pw.write(x);
     pw.close();
 
-    String resourceUrl = getResourceUrl("clusterTest1", "db_11");
-    deleteUrl(resourceUrl, false);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_9900");
+    controller.syncStart();
 
+    // start mock nodes
+    Map<String, MockParticipantManager> participants =
+        new HashMap<String, MockParticipantManager>();
+    for (int i = 0; i < 6; i++) {
+      String instanceName = "localhost_123" + i;
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participant.syncStart();
+      participants.put(instanceName, participant);
+    }
     boolean verifyResult =
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            "clusterTest1"));
+            clusterName));
     Assert.assertTrue(verifyResult);
-    Map<String, String> paraMap = new HashMap<String, String>();
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addResource);
-    paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_11");
-    paraMap.put(JsonParameters.PARTITIONS, "22");
-    paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "MasterSlave");
-    String response =
-        assertSuccessPostOperation(getClusterUrl("clusterTest1") + "/resourceGroups", paraMap,
-            false);
 
-    String idealStateUrl = getResourceUrl("clusterTest1", "db_11") + "/idealState";
-    Assert.assertTrue(response.contains("db_11"));
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addIdealState);
+    String resourceUrl = getResourceUrl(clusterName, "db_11");
+    deleteUrl(resourceUrl, false);
+
+    verifyResult =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(verifyResult);
+    addResource(clusterName, "db_11", 22);
+
+    String idealStateUrl = getResourceUrl(clusterName, "db_11") + "/idealState";
     Map<String, String> extraform = new HashMap<String, String>();
     extraform.put(JsonParameters.NEW_IDEAL_STATE, x);
-    response = assertSuccessPostOperation(idealStateUrl, paraMap, extraform, false);
+    assertSuccessPostOperation(idealStateUrl, addIdealStateCmd(), extraform, false);
 
     verifyResult =
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            "clusterTest1"));
+            clusterName));
     Assert.assertTrue(verifyResult);
 
-    ZNRecord record2 =
-        _gSetupTool._admin.getResourceIdealState("clusterTest1", "db_11").getRecord();
+    ZNRecord record2 = _gSetupTool._admin.getResourceIdealState(clusterName, "db_11").getRecord();
     Assert.assertTrue(record2.equals(record));
+
+    // clean up
+    controller.syncStop();
+    for (MockParticipantManager participant : participants.values()) {
+      participant.syncStop();
+    }
   }
 
-  private void testExpandCluster() throws Exception {
-    boolean verifyResult;
+  private Map<String, String> addInstanceCmd(String instances) {
+    Map<String, String> parameters = new HashMap<String, String>();
+    parameters.put(JsonParameters.INSTANCE_NAMES, instances);
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
 
-    String clusterUrl = getClusterUrl("clusterTest1");
-    String instancesUrl = clusterUrl + "/instances";
+    return parameters;
+  }
+
+  private Map<String, String> expandClusterCmd() {
+    Map<String, String> parameters = new HashMap<String, String>();
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.expandCluster);
+
+    return parameters;
+  }
+
+  @Test
+  public void testExpandCluster() throws Exception {
 
-    Map<String, String> paraMap = new HashMap<String, String>();
-    paraMap.put(JsonParameters.INSTANCE_NAMES,
-        "localhost:12331;localhost:12341;localhost:12351;localhost:12361");
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
+    final String clusterName = "clusterTestExpandCluster";
 
-    String response = assertSuccessPostOperation(instancesUrl, paraMap, false);
-    String[] hosts = "localhost:12331;localhost:12341;localhost:12351;localhost:12361".split(";");
+    // setup cluster
+    addCluster(clusterName);
+    addInstancesToCluster(clusterName, "localhost:123", 6, null);
+    addResource(clusterName, "db_11", 22);
+    rebalanceResource(clusterName, "db_11");
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_9900");
+    controller.syncStart();
+
+    // start mock nodes
+    Map<String, MockParticipantManager> participants =
+        new HashMap<String, MockParticipantManager>();
+    for (int i = 0; i < 6; i++) {
+      String instanceName = "localhost_123" + i;
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participant.syncStart();
+      participants.put(instanceName, participant);
+    }
+
+    boolean verifyResult =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(verifyResult);
+
+    String clusterUrl = getClusterUrl(clusterName);
+    String instancesUrl = clusterUrl + "/instances";
+
+    String instances = "localhost:12331;localhost:12341;localhost:12351;localhost:12361";
+    String response = assertSuccessPostOperation(instancesUrl, addInstanceCmd(instances), false);
+    String[] hosts = instances.split(";");
     for (String host : hosts) {
       Assert.assertTrue(response.contains(host.replace(':', '_')));
     }
-    paraMap.clear();
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.expandCluster);
-    response = assertSuccessPostOperation(clusterUrl, paraMap, false);
+
+    response = assertSuccessPostOperation(clusterUrl, expandClusterCmd(), false);
 
     for (int i = 3; i <= 6; i++) {
-      StartCMResult result =
-          TestHelper.startDummyProcess(ZK_ADDR, "clusterTest1", "localhost_123" + i + "1");
-      _startCMResultMap.put("localhost_123" + i + "1", result);
+      String instanceName = "localhost_123" + i + "1";
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participant.syncStart();
+      participants.put(instanceName, participant);
     }
 
     verifyResult =
-        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
-            "clusterTest1"));
+        ClusterStateVerifier
+            .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, clusterName));
     Assert.assertTrue(verifyResult);
 
     verifyResult =
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            "clusterTest1"));
+            clusterName));
     Assert.assertTrue(verifyResult);
+
+    // clean up
+    controller.syncStop();
+    for (MockParticipantManager participant : participants.values()) {
+      participant.syncStop();
+    }
   }
 
-  private void testEnablePartitions() throws IOException, InterruptedException {
-    HelixDataAccessor accessor;
-    accessor = _startCMResultMap.get("localhost_1231")._manager.getHelixDataAccessor();
+  private Map<String, String> enablePartitionCmd(String resourceName, String partitions,
+      boolean enabled) {
+    Map<String, String> parameters = new HashMap<String, String>();
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enablePartition);
+    parameters.put(JsonParameters.ENABLED, "" + enabled);
+    parameters.put(JsonParameters.PARTITION, partitions);
+    parameters.put(JsonParameters.RESOURCE, resourceName);
+
+    return parameters;
+  }
+
+  @Test
+  public void testEnablePartitions() throws IOException, InterruptedException {
+    final String clusterName = "clusterTestEnablePartitions";
+
+    // setup cluster
+    addCluster(clusterName);
+    addInstancesToCluster(clusterName, "localhost:123", 6, null);
+    addResource(clusterName, "db_11", 22);
+    rebalanceResource(clusterName, "db_11");
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_9900");
+    controller.syncStart();
+
+    // start mock nodes
+    Map<String, MockParticipantManager> participants =
+        new HashMap<String, MockParticipantManager>();
+    for (int i = 0; i < 6; i++) {
+      String instanceName = "localhost_123" + i;
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participant.syncStart();
+      participants.put(instanceName, participant);
+    }
+
+    HelixDataAccessor accessor = participants.get("localhost_1231").getHelixDataAccessor();
     // drop node should fail as not disabled
     String hostName = "localhost_1231";
-    String instanceUrl = getInstanceUrl("clusterTest1", hostName);
+    String instanceUrl = getInstanceUrl(clusterName, hostName);
     ExternalView ev = accessor.getProperty(accessor.keyBuilder().externalView("db_11"));
 
-    Map<String, String> paraMap = new HashMap<String, String>();
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enablePartition);
-    paraMap.put(JsonParameters.ENABLED, "false");
-    paraMap.put(JsonParameters.PARTITION, "db_11_0;db_11_15");
-    paraMap.put(JsonParameters.RESOURCE, "db_11");
-
-    String response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+    String response =
+        assertSuccessPostOperation(instanceUrl,
+            enablePartitionCmd("db_11", "db_11_0;db_11_11", false), false);
     Assert.assertTrue(response.contains("DISABLED_PARTITION"));
     Assert.assertTrue(response.contains("db_11_0"));
-    Assert.assertTrue(response.contains("db_11_15"));
+    Assert.assertTrue(response.contains("db_11_11"));
 
     boolean verifyResult =
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            "clusterTest1"));
+            clusterName));
     Assert.assertTrue(verifyResult);
 
     ev = accessor.getProperty(accessor.keyBuilder().externalView("db_11"));
     Assert.assertEquals(ev.getStateMap("db_11_0").get(hostName), "OFFLINE");
-    Assert.assertEquals(ev.getStateMap("db_11_15").get(hostName), "OFFLINE");
+    Assert.assertEquals(ev.getStateMap("db_11_11").get(hostName), "OFFLINE");
 
-    paraMap.put(JsonParameters.ENABLED, "true");
-    response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+    response =
+        assertSuccessPostOperation(instanceUrl,
+            enablePartitionCmd("db_11", "db_11_0;db_11_11", true), false);
     Assert.assertFalse(response.contains("db_11_0"));
-    Assert.assertFalse(response.contains("db_11_15"));
+    Assert.assertFalse(response.contains("db_11_11"));
 
     verifyResult =
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            "clusterTest1"));
+            clusterName));
     Assert.assertTrue(verifyResult);
 
     ev = accessor.getProperty(accessor.keyBuilder().externalView("db_11"));
     Assert.assertEquals(ev.getStateMap("db_11_0").get(hostName), "MASTER");
-    Assert.assertEquals(ev.getStateMap("db_11_15").get(hostName), "SLAVE");
+    Assert.assertEquals(ev.getStateMap("db_11_11").get(hostName), "SLAVE");
+
+    // clean up
+    controller.syncStop();
+    for (MockParticipantManager participant : participants.values()) {
+      participant.syncStop();
+    }
+  }
+
+  private Map<String, String> enableInstanceCmd(boolean enabled) {
+    Map<String, String> parameters = new HashMap<String, String>();
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableInstance);
+    parameters.put(JsonParameters.ENABLED, "" + enabled);
+    return parameters;
   }
 
-  private void testInstanceOperations() throws Exception {
+  private Map<String, String> swapInstanceCmd(String oldInstance, String newInstance) {
+    Map<String, String> parameters = new HashMap<String, String>();
+
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.swapInstance);
+    parameters.put(JsonParameters.OLD_INSTANCE, oldInstance);
+    parameters.put(JsonParameters.NEW_INSTANCE, newInstance);
+
+    return parameters;
+  }
+
+  @Test
+  public void testInstanceOperations() throws Exception {
+    final String clusterName = "clusterTestInstanceOperations";
+
+    // setup cluster
+    addCluster(clusterName);
+    addInstancesToCluster(clusterName, "localhost:123", 6, null);
+    addResource(clusterName, "db_11", 8);
+    rebalanceResource(clusterName, "db_11");
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_9900");
+    controller.syncStart();
+
+    // start mock nodes
+    Map<String, MockParticipantManager> participants =
+        new HashMap<String, MockParticipantManager>();
+    for (int i = 0; i < 6; i++) {
+      String instanceName = "localhost_123" + i;
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participant.syncStart();
+      participants.put(instanceName, participant);
+    }
+
     HelixDataAccessor accessor;
     // drop node should fail as not disabled
-    String instanceUrl = getInstanceUrl("clusterTest1", "localhost_1232");
+    String instanceUrl = getInstanceUrl(clusterName, "localhost_1232");
     deleteUrl(instanceUrl, true);
 
     // disabled node
-    Map<String, String> paraMap = new HashMap<String, String>();
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableInstance);
-    paraMap.put(JsonParameters.ENABLED, "false");
-    String response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+    String response = assertSuccessPostOperation(instanceUrl, enableInstanceCmd(false), false);
     Assert.assertTrue(response.contains("false"));
 
     // Cannot drop / swap
     deleteUrl(instanceUrl, true);
 
-    String instancesUrl = getClusterUrl("clusterTest1") + "/instances";
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.swapInstance);
-    paraMap.put(JsonParameters.OLD_INSTANCE, "localhost_1232");
-    paraMap.put(JsonParameters.NEW_INSTANCE, "localhost_12320");
-    response = assertSuccessPostOperation(instancesUrl, paraMap, true);
+    String instancesUrl = getClusterUrl(clusterName) + "/instances";
+    response =
+        assertSuccessPostOperation(instancesUrl,
+            swapInstanceCmd("localhost_1232", "localhost_12320"), true);
 
     // disconnect the node
-    _startCMResultMap.get("localhost_1232")._manager.disconnect();
-    _startCMResultMap.get("localhost_1232")._thread.interrupt();
+    participants.get("localhost_1232").syncStop();
 
     // add new node then swap instance
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
-    paraMap.put(JsonParameters.INSTANCE_NAME, "localhost_12320");
-    response = assertSuccessPostOperation(instancesUrl, paraMap, false);
+    response = assertSuccessPostOperation(instancesUrl, addInstanceCmd("localhost_12320"), false);
     Assert.assertTrue(response.contains("localhost_12320"));
 
     // swap instance. The instance get swapped out should not exist anymore
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.swapInstance);
-    paraMap.put(JsonParameters.OLD_INSTANCE, "localhost_1232");
-    paraMap.put(JsonParameters.NEW_INSTANCE, "localhost_12320");
-    response = assertSuccessPostOperation(instancesUrl, paraMap, false);
+    response =
+        assertSuccessPostOperation(instancesUrl,
+            swapInstanceCmd("localhost_1232", "localhost_12320"), false);
     Assert.assertTrue(response.contains("localhost_12320"));
     Assert.assertFalse(response.contains("localhost_1232\""));
 
-    accessor = _startCMResultMap.get("localhost_1231")._manager.getHelixDataAccessor();
+    accessor = participants.get("localhost_1231").getHelixDataAccessor();
     String path = accessor.keyBuilder().instanceConfig("localhost_1232").getPath();
     Assert.assertFalse(_gZkClient.exists(path));
 
-    _startCMResultMap.put("localhost_12320",
-        TestHelper.startDummyProcess(ZK_ADDR, "clusterTest1", "localhost_12320"));
+    MockParticipantManager newParticipant =
+        new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12320");
+    newParticipant.syncStart();
+    participants.put("localhost_12320", newParticipant);
+
+    boolean verifyResult =
+        ClusterStateVerifier
+            .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, clusterName));
+    Assert.assertTrue(verifyResult);
+
+    // clean up
+    controller.syncStop();
+    for (MockParticipantManager participant : participants.values()) {
+      participant.syncStop();
+    }
   }
 
-  private void testStartCluster() throws Exception, InterruptedException {
+  @Test
+  public void testStartCluster() throws Exception {
+    final String clusterName = "clusterTestStartCluster";
+    final String controllerClusterName = "controllerClusterTestStartCluster";
+
+    Map<String, MockParticipantManager> participants =
+        new HashMap<String, MockParticipantManager>();
+    Map<String, ClusterDistributedController> distControllers =
+        new HashMap<String, ClusterDistributedController>();
+
+    // setup cluster
+    addCluster(clusterName);
+    addInstancesToCluster(clusterName, "localhost:123", 6, null);
+    addResource(clusterName, "db_11", 8);
+    rebalanceResource(clusterName, "db_11");
+
+    addCluster(controllerClusterName);
+    addInstancesToCluster(controllerClusterName, "controller_900", 2, null);
+
     // start mock nodes
     for (int i = 0; i < 6; i++) {
-      StartCMResult result =
-          TestHelper.startDummyProcess(ZK_ADDR, "clusterTest1", "localhost_123" + i);
-      _startCMResultMap.put("localhost_123" + i, result);
+      String instanceName = "localhost_123" + i;
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participant.syncStart();
+      participants.put(instanceName, participant);
     }
 
     // start controller nodes
     for (int i = 0; i < 2; i++) {
-      StartCMResult result =
-          TestHelper.startController("Klazt3rz", "controller_900" + i, ZK_ADDR,
-              HelixControllerMain.DISTRIBUTED);
-
-      _startCMResultMap.put("controller_900" + i, result);
+      String controllerName = "controller_900" + i;
+      ClusterDistributedController distController =
+          new ClusterDistributedController(ZK_ADDR, controllerClusterName, controllerName);
+      distController.syncStart();
+      distControllers.put(controllerName, distController);
     }
     Thread.sleep(100);
 
     // activate clusters
     // wrong grand clustername
-
-    String clusterUrl = getClusterUrl("clusterTest1");
-    Map<String, String> paraMap = new HashMap<String, String>();
-    paraMap.put(JsonParameters.ENABLED, "true");
-    paraMap.put(JsonParameters.GRAND_CLUSTER, "Klazters");
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.activateCluster);
-
-    String response = assertSuccessPostOperation(clusterUrl, paraMap, true);
+    String clusterUrl = getClusterUrl(clusterName);
+    assertSuccessPostOperation(clusterUrl, activateClusterCmd("nonExistCluster", true), true);
 
     // wrong cluster name
-    clusterUrl = getClusterUrl("clusterTest2");
-    paraMap.put(JsonParameters.GRAND_CLUSTER, "Klazt3rz");
-    response = assertSuccessPostOperation(clusterUrl, paraMap, true);
+    clusterUrl = getClusterUrl("nonExistCluster");
+    assertSuccessPostOperation(clusterUrl, activateClusterCmd(controllerClusterName, true), true);
 
-    paraMap.put(JsonParameters.ENABLED, "true");
-    paraMap.put(JsonParameters.GRAND_CLUSTER, "Klazt3rz");
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.activateCluster);
-    clusterUrl = getClusterUrl("clusterTest1");
-    response = assertSuccessPostOperation(clusterUrl, paraMap, false);
+    clusterUrl = getClusterUrl(clusterName);
+    assertSuccessPostOperation(clusterUrl, activateClusterCmd(controllerClusterName, true), false);
     Thread.sleep(500);
 
     deleteUrl(clusterUrl, true);
 
     // verify leader node
-    HelixDataAccessor accessor =
-        _startCMResultMap.get("controller_9001")._manager.getHelixDataAccessor();
+    HelixDataAccessor accessor = distControllers.get("controller_9001").getHelixDataAccessor();
     LiveInstance controllerLeader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
     Assert.assertTrue(controllerLeader.getInstanceName().startsWith("controller_900"));
 
-    accessor = _startCMResultMap.get("localhost_1232")._manager.getHelixDataAccessor();
+    accessor = participants.get("localhost_1232").getHelixDataAccessor();
     LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
     for (int i = 0; i < 5; i++) {
       if (leader != null) {
@@ -601,81 +793,98 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
     Assert.assertTrue(leader.getInstanceName().startsWith("controller_900"));
 
     boolean verifyResult =
-        ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
-            "clusterTest1"));
+        ClusterStateVerifier
+            .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, clusterName));
     Assert.assertTrue(verifyResult);
 
     verifyResult =
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            "clusterTest1"));
+            clusterName));
     Assert.assertTrue(verifyResult);
+    Thread.sleep(1000);
+
+    // clean up
+    for (ClusterDistributedController controller : distControllers.values()) {
+      controller.syncStop();
+    }
+    for (MockParticipantManager participant : participants.values()) {
+      participant.syncStop();
+    }
+  }
+
+  private Map<String, String> rebalanceCmd(int replicas, String prefix, String tag) {
+    Map<String, String> parameters = new HashMap<String, String>();
+    parameters.put(JsonParameters.REPLICAS, "" + replicas);
+    if (prefix != null) {
+      parameters.put(JsonParameters.RESOURCE_KEY_PREFIX, prefix);
+    }
+    if (tag != null) {
+      parameters.put(ClusterSetup.instanceGroupTag, tag);
+    }
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
+
+    return parameters;
   }
 
-  private void testRebalanceResource() throws Exception {
-    String resourceUrl = getResourceUrl("clusterTest1", "db_11");
-    Map<String, String> paraMap = new HashMap<String, String>();
-    paraMap.put(JsonParameters.REPLICAS, "3");
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
+  private void rebalanceResource(String clusterName, String resourceName) throws IOException {
+    String resourceUrl = getResourceUrl(clusterName, resourceName);
+    String idealStateUrl = resourceUrl + "/idealState";
+
+    assertSuccessPostOperation(idealStateUrl, rebalanceCmd(3, null, null), false);
+  }
+
+  @Test
+  public void testRebalanceResource() throws Exception {
+    // add a normal cluster
+    final String clusterName = "clusterTestRebalanceResource";
+    addCluster(clusterName);
 
-    String ISUrl = resourceUrl + "/idealState";
-    String response = assertSuccessPostOperation(ISUrl, paraMap, false);
+    addInstancesToCluster(clusterName, "localhost:123", 3, _tag1);
+    addResource(clusterName, "db_11", 44);
+
+    String resourceUrl = getResourceUrl(clusterName, "db_11");
+
+    String idealStateUrl = resourceUrl + "/idealState";
+    String response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(3, null, null), false);
     ZNRecord record = JsonToObject(ZNRecord.class, response);
     Assert.assertTrue(record.getId().equalsIgnoreCase("db_11"));
-    Assert
-        .assertTrue((((List<String>) (record.getListFields().values().toArray()[0]))).size() == 3);
-    Assert.assertTrue((((Map<String, String>) (record.getMapFields().values().toArray()[0])))
-        .size() == 3);
+    Assert.assertEquals(record.getListField("db_11_0").size(), 3);
+    Assert.assertEquals(record.getMapField("db_11_0").size(), 3);
 
     deleteUrl(resourceUrl, false);
 
     // re-add and rebalance
-    String reourcesUrl = "http://localhost:" + ADMIN_PORT + "/clusters/clusterTest1/resourceGroups";
+    final String reourcesUrl =
+        "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups";
+
     response = getUrl(reourcesUrl);
     Assert.assertFalse(response.contains("db_11"));
 
-    paraMap = new HashMap<String, String>();
-    paraMap.put(JsonParameters.RESOURCE_GROUP_NAME, "db_11");
-    paraMap.put(JsonParameters.STATE_MODEL_DEF_REF, "MasterSlave");
-    paraMap.put(JsonParameters.PARTITIONS, "48");
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addResource);
-
-    response = assertSuccessPostOperation(reourcesUrl, paraMap, false);
-    Assert.assertTrue(response.contains("db_11"));
-
-    ISUrl = resourceUrl + "/idealState";
-    paraMap.put(JsonParameters.REPLICAS, "3");
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
-    response = assertSuccessPostOperation(ISUrl, paraMap, false);
+    addResource(clusterName, "db_11", 48);
+    idealStateUrl = resourceUrl + "/idealState";
+    response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(3, null, null), false);
     record = JsonToObject(ZNRecord.class, response);
     Assert.assertTrue(record.getId().equalsIgnoreCase("db_11"));
-    Assert
-        .assertTrue((((List<String>) (record.getListFields().values().toArray()[0]))).size() == 3);
-    Assert.assertTrue((((Map<String, String>) (record.getMapFields().values().toArray()[0])))
-        .size() == 3);
+    Assert.assertEquals(record.getListField("db_11_0").size(), 3);
+    Assert.assertEquals(record.getMapField("db_11_0").size(), 3);
 
     // rebalance with key prefix
-    resourceUrl = getResourceUrl("clusterTest1", "db_22");
-    ISUrl = resourceUrl + "/idealState";
-    paraMap.put(JsonParameters.REPLICAS, "2");
-    paraMap.put(JsonParameters.RESOURCE_KEY_PREFIX, "alias");
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
-    response = assertSuccessPostOperation(ISUrl, paraMap, false);
+    addResource(clusterName, "db_22", 55);
+    resourceUrl = getResourceUrl(clusterName, "db_22");
+    idealStateUrl = resourceUrl + "/idealState";
+    response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(2, "alias", null), false);
     record = JsonToObject(ZNRecord.class, response);
     Assert.assertTrue(record.getId().equalsIgnoreCase("db_22"));
-    Assert
-        .assertTrue((((List<String>) (record.getListFields().values().toArray()[0]))).size() == 2);
-    Assert.assertTrue((((Map<String, String>) (record.getMapFields().values().toArray()[0])))
-        .size() == 2);
+    Assert.assertEquals(record.getListField("alias_0").size(), 2);
+    Assert.assertEquals(record.getMapField("alias_0").size(), 2);
     Assert.assertTrue((((String) (record.getMapFields().keySet().toArray()[0])))
         .startsWith("alias_"));
     Assert.assertFalse(response.contains(IdealStateProperty.INSTANCE_GROUP_TAG.toString()));
-    resourceUrl = getResourceUrl("clusterTest1", "db_33");
-    ISUrl = resourceUrl + "/idealState";
-    paraMap.put(JsonParameters.REPLICAS, "2");
-    paraMap.remove(JsonParameters.RESOURCE_KEY_PREFIX);
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
-    paraMap.put(ClusterSetup.instanceGroupTag, _tag1);
-    response = assertSuccessPostOperation(ISUrl, paraMap, false);
+
+    addResource(clusterName, "db_33", 44);
+    resourceUrl = getResourceUrl(clusterName, "db_33");
+    idealStateUrl = resourceUrl + "/idealState";
+    response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(2, null, _tag1), false);
 
     Assert.assertTrue(response.contains(IdealStateProperty.INSTANCE_GROUP_TAG.toString()));
     Assert.assertTrue(response.contains(_tag1));
@@ -688,14 +897,10 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
       }
     }
 
-    resourceUrl = getResourceUrl("clusterTest1", "db_44");
-    ISUrl = resourceUrl + "/idealState";
-    paraMap.put(JsonParameters.REPLICAS, "2");
-    paraMap.remove(JsonParameters.RESOURCE_KEY_PREFIX);
-    paraMap.put(JsonParameters.RESOURCE_KEY_PREFIX, "alias");
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
-    paraMap.put(ClusterSetup.instanceGroupTag, _tag1);
-    response = assertSuccessPostOperation(ISUrl, paraMap, false);
+    addResource(clusterName, "db_44", 44);
+    resourceUrl = getResourceUrl(clusterName, "db_44");
+    idealStateUrl = resourceUrl + "/idealState";
+    response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(2, "alias", _tag1), false);
     Assert.assertTrue(response.contains(IdealStateProperty.INSTANCE_GROUP_TAG.toString()));
     Assert.assertTrue(response.contains(_tag1));
 
@@ -713,24 +918,66 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
     }
   }
 
-  private void testAddInstance() throws Exception {
-    String clusterUrl = getClusterUrl("clusterTest1");
-    Map<String, String> paraMap = new HashMap<String, String>();
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
-    String response = null;
-    // Add instances to cluster
+  private void addInstancesToCluster(String clusterName, String instanceNamePrefix, int n,
+      String tag) throws IOException {
+    Map<String, String> parameters = new HashMap<String, String>();
+    final String clusterUrl = getClusterUrl(clusterName);
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
+
+    // add instances to cluster
     String instancesUrl = clusterUrl + "/instances";
-    for (int i = 0; i < 3; i++) {
+    for (int i = 0; i < n; i++) {
 
-      paraMap.put(JsonParameters.INSTANCE_NAME, "localhost:123" + i);
-      response = assertSuccessPostOperation(instancesUrl, paraMap, false);
-      Assert.assertTrue(response.contains(("localhost:123" + i).replace(':', '_')));
+      parameters.put(JsonParameters.INSTANCE_NAME, instanceNamePrefix + i);
+      String response = assertSuccessPostOperation(instancesUrl, parameters, false);
+      Assert.assertTrue(response.contains((instanceNamePrefix + i).replace(':', '_')));
     }
-    paraMap.remove(JsonParameters.INSTANCE_NAME);
-    paraMap.put(JsonParameters.INSTANCE_NAMES,
-        "localhost:1233;localhost:1234;localhost:1235;localhost:1236");
 
-    response = assertSuccessPostOperation(instancesUrl, paraMap, false);
+    // add tag to instance
+    if (tag != null && !tag.isEmpty()) {
+      parameters.clear();
+      parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstanceTag);
+      parameters.put(ClusterSetup.instanceGroupTag, tag);
+      for (int i = 0; i < n; i++) {
+        String instanceUrl = instancesUrl + "/" + (instanceNamePrefix + i).replace(':', '_');
+        String response = assertSuccessPostOperation(instanceUrl, parameters, false);
+        Assert.assertTrue(response.contains(_tag1));
+      }
+    }
+
+  }
+
+  private Map<String, String> addInstanceTagCmd(String tag) {
+    Map<String, String> parameters = new HashMap<String, String>();
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstanceTag);
+    parameters.put(ClusterSetup.instanceGroupTag, tag);
+
+    return parameters;
+  }
+
+  private Map<String, String> removeInstanceTagCmd(String tag) {
+    Map<String, String> parameters = new HashMap<String, String>();
+    parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.removeInstanceTag);
+    parameters.put(ClusterSetup.instanceGroupTag, tag);
+
+    return parameters;
+  }
+
+  @Test
+  public void testAddInstance() throws Exception {
+    final String clusterName = "clusterTestAddInstance";
+
+    // add normal cluster
+    addCluster(clusterName);
+
+    String clusterUrl = getClusterUrl(clusterName);
+
+    // Add instances to cluster
+    String instancesUrl = clusterUrl + "/instances";
+    addInstancesToCluster(clusterName, "localhost:123", 3, null);
+
+    String instances = "localhost:1233;localhost:1234;localhost:1235;localhost:1236";
+    String response = assertSuccessPostOperation(instancesUrl, addInstanceCmd(instances), false);
     for (int i = 3; i <= 6; i++) {
       Assert.assertTrue(response.contains("localhost_123" + i));
     }
@@ -749,42 +996,34 @@ public class TestHelixAdminScenariosRest extends AdminTestBase {
 
     // disable node
     instanceUrl = instancesUrl + "/localhost_1236";
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableInstance);
-    paraMap.put(JsonParameters.ENABLED, "false");
-    response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+    response = assertSuccessPostOperation(instanceUrl, enableInstanceCmd(false), false);
     Assert.assertTrue(response.contains("false"));
 
     deleteUrl(instanceUrl, false);
 
+    // add controller cluster
+    final String controllerClusterName = "controllerClusterTestAddInstance";
+    addCluster(controllerClusterName);
+
     // add node to controller cluster
-    paraMap.remove(JsonParameters.INSTANCE_NAME);
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
-    paraMap.put(JsonParameters.INSTANCE_NAMES, "controller:9000;controller:9001");
-    String controllerUrl = getClusterUrl("Klazt3rz") + "/instances";
-    response = assertSuccessPostOperation(controllerUrl, paraMap, false);
+    String controllers = "controller:9000;controller:9001";
+    String controllerUrl = getClusterUrl(controllerClusterName) + "/instances";
+    response = assertSuccessPostOperation(controllerUrl, addInstanceCmd(controllers), false);
     Assert.assertTrue(response.contains("controller_9000"));
     Assert.assertTrue(response.contains("controller_9001"));
 
-    // add a dup host
-    paraMap.remove(JsonParameters.INSTANCE_NAMES);
-    paraMap.put(JsonParameters.INSTANCE_NAME, "localhost:1234");
-    response = assertSuccessPostOperation(instancesUrl, paraMap, true);
-
-    // add tags
+    // add a duplicated host
+    response = assertSuccessPostOperation(instancesUrl, addInstanceCmd("localhost:1234"), true);
 
-    paraMap.clear();
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstanceTag);
-    paraMap.put(ClusterSetup.instanceGroupTag, _tag1);
+    // add/remove tags
     for (int i = 0; i < 4; i++) {
       instanceUrl = instancesUrl + "/localhost_123" + i;
-      response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+      response = assertSuccessPostOperation(instanceUrl, addInstanceTagCmd(_tag1), false);
       Assert.assertTrue(response.contains(_tag1));
-
     }
-    paraMap.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.removeInstanceTag);
+
     instanceUrl = instancesUrl + "/localhost_1233";
-    response = assertSuccessPostOperation(instanceUrl, paraMap, false);
+    response = assertSuccessPostOperation(instanceUrl, removeInstanceTagCmd(_tag1), false);
     Assert.assertFalse(response.contains(_tag1));
-
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetInstance.java b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetInstance.java
index 9534cf5..fd12080 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetInstance.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetInstance.java
@@ -25,8 +25,8 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.ErrTransition;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -53,12 +53,8 @@ public class TestResetInstance extends AdminTestBase {
         3, // replicas
         "MasterSlave", true); // do rebalance
 
-    // // start admin thread
-    // AdminThread adminThread = new AdminThread(ZK_ADDR, _port);
-    // adminThread.start();
-
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -69,16 +65,16 @@ public class TestResetInstance extends AdminTestBase {
     };
 
     // start mock participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
         participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR,
-                new ErrTransition(errPartitions));
+            new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -111,9 +107,6 @@ public class TestResetInstance extends AdminTestBase {
     Assert.assertTrue(result, "Cluster verification fails");
 
     // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
-    // adminThread.stop();
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
index 9b07445..82a2607 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetPartitionState.java
@@ -30,10 +30,10 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.State;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.ErrTransition;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
@@ -95,12 +95,8 @@ public class TestResetPartitionState extends AdminTestBase {
         3, // replicas
         "MasterSlave", true); // do rebalance
 
-    // start admin thread
-    // AdminThread adminThread = new AdminThread(ZK_ADDR, _port);
-    // adminThread.start();
-
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
@@ -108,16 +104,16 @@ public class TestResetPartitionState extends AdminTestBase {
     errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
 
     // start mock participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
         participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR,
-                new ErrTransition(errPartitions));
+            new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -171,9 +167,6 @@ public class TestResetPartitionState extends AdminTestBase {
     Assert.assertEquals(_errToOfflineInvoked.get(), 2, "reset() should be invoked 2 times");
 
     // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
-    // adminThread.stop();
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
@@ -185,8 +178,7 @@ public class TestResetPartitionState extends AdminTestBase {
   private void clearStatusUpdate(String clusterName, String instance, String resource,
       String partition) {
     // clear status update for error partition so verify() will not fail on
-    // old
-    // errors
+    // old errors
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetResource.java
index 96f4f6c..db9e9bb 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/tools/TestResetResource.java
@@ -25,8 +25,8 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.ErrTransition;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -53,12 +53,8 @@ public class TestResetResource extends AdminTestBase {
         3, // replicas
         "MasterSlave", true); // do rebalance
 
-    // start admin thread
-    // AdminThread adminThread = new AdminThread(ZK_ADDR, _port);
-    // adminThread.start();
-
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
@@ -69,16 +65,16 @@ public class TestResetResource extends AdminTestBase {
     };
 
     // start mock participants
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
       if (i == 0) {
         participants[i] =
-            new MockParticipant(clusterName, instanceName, ZK_ADDR,
-                new ErrTransition(errPartitions));
+            new MockParticipantManager(ZK_ADDR, clusterName, instanceName); 
+        participants[i].setTransition(new ErrTransition(errPartitions));
       } else {
-        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
+        participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       }
       participants[i].syncStart();
     }
@@ -112,15 +108,11 @@ public class TestResetResource extends AdminTestBase {
     Assert.assertTrue(result, "Cluster verification fails");
 
     // clean up
-    // wait for all zk callbacks done
-    Thread.sleep(1000);
-    // adminThread.stop();
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java b/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
index 083cbd4..27b4d36 100644
--- a/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
+++ b/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
@@ -21,27 +21,30 @@ package org.apache.helix.agent;
 
 import java.io.File;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.ExternalCommand;
 import org.apache.helix.ScriptTestHelper;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class TestHelixAgent extends ZkUnitTestBase {
+  private final static Logger LOG = Logger.getLogger(TestHelixAgent.class);
+
   final String workingDir = ScriptTestHelper.getPrefix() + ScriptTestHelper.INTEGRATION_SCRIPT_DIR;
   ExternalCommand serverCmd = null;
 
@@ -94,13 +97,9 @@ public class TestHelixAgent extends ZkUnitTestBase {
         "MasterSlave", true); // do rebalance
 
     // set cluster config
-    ZkClient client =
-        new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
-            new ZNRecordSerializer());
-
     HelixConfigScope scope =
         new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
-    ConfigAccessor configAccessor = new ConfigAccessor(client);
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
 
     // String pidFile = ScriptTestHelper.getPrefix() + ScriptTestHelper.INTEGRATION_LOG_DIR +
     // "/default/foo_{PARTITION_NAME}_pid.txt";
@@ -148,10 +147,11 @@ public class TestHelixAgent extends ZkUnitTestBase {
     configAccessor.set(scope, cmdConfig.toKeyValueMap());
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", zkAddr);
+    ClusterControllerManager controller = new ClusterControllerManager(zkAddr, clusterName, "controller_0");
     controller.syncStart();
 
     // start helix-agent
+    Map<String, Thread> agents = new HashMap<String, Thread>();
     for (int i = 0; i < n; i++) {
       final String instanceName = "localhost_" + (12918 + i);
       Thread agentThread = new Thread() {
@@ -163,11 +163,11 @@ public class TestHelixAgent extends ZkUnitTestBase {
                 "--stateModel", "MasterSlave"
             });
           } catch (Exception e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+            LOG.error("Exception start helix-agent", e);
           }
         }
       };
+      agents.put(instanceName, agentThread);
       agentThread.start();
 
       // wait participant thread to start
@@ -197,6 +197,11 @@ public class TestHelixAgent extends ZkUnitTestBase {
             clusterName));
     Assert.assertTrue(result);
 
+    // clean up
+    controller.syncStop();
+    for (Thread agentThread : agents.values()) {
+      agentThread.interrupt();
+    }
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/HelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 24f8f1e..54e9943 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -102,6 +102,7 @@ public interface HelixManager {
    * @param listener
    * @deprecated replaced by addInstanceConfigChangeListener()
    */
+  @Deprecated
   void addConfigChangeListener(ConfigChangeListener listener) throws Exception;
 
   /**
@@ -155,6 +156,12 @@ public interface HelixManager {
   void addControllerListener(ControllerChangeListener listener);
 
   /**
+   * Add message listener for controller
+   * @param listener
+   */
+  void addControllerMessageListener(MessageListener listener);
+
+  /**
    * Removes the listener. If the same listener was used for multiple changes,
    * all change notifications will be removed.<br/>
    * This will invoke onChange method on the listener with

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/HelixProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index 9c0c25e..00189b6 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -28,11 +28,14 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.api.config.NamespacedConfig;
+import org.apache.log4j.Logger;
 
 /**
  * A wrapper class for ZNRecord. Used as a base class for IdealState, CurrentState, etc.
  */
 public class HelixProperty {
+  private static Logger LOG = Logger.getLogger(HelixProperty.class);
+
   public enum HelixPropertyAttribute {
     BUCKET_SIZE,
     BATCH_MESSAGE_MODE
@@ -138,8 +141,7 @@ public class HelixProperty {
       });
       return constructor.newInstance(record);
     } catch (Exception e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      LOG.error("Exception convert znrecord: " + record + " to class: " + clazz, e);
     }
 
     return null;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
index c5a7f22..a0ad6f3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/SemiAutoRebalancer.java
@@ -9,6 +9,7 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.context.SemiAutoRebalancerContext;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
@@ -59,6 +60,7 @@ public class SemiAutoRebalancer implements HelixRebalancer {
       LOG.debug("Processing resource:" + config.getResourceId());
     }
     ResourceAssignment partitionMapping = new ResourceAssignment(config.getResourceId());
+
     for (PartitionId partition : config.getPartitionSet()) {
       Map<ParticipantId, State> currentStateMap =
           currentState.getCurrentStateMap(config.getResourceId(), partition);
@@ -71,6 +73,7 @@ public class SemiAutoRebalancer implements HelixRebalancer {
       Map<State, String> upperBounds =
           ConstraintBasedAssignment.stateConstraints(stateModelDef, config.getResourceId(),
               cluster.getConfig());
+
       Map<ParticipantId, State> bestStateForPartition =
           ConstraintBasedAssignment.computeAutoBestStateForPartition(upperBounds, cluster
               .getLiveParticipantMap().keySet(), stateModelDef, preferenceList, currentStateMap,

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index a0ee1c1..84129de 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer.util;
  */
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -110,38 +111,56 @@ public class ConstraintBasedAssignment {
   }
 
   /**
-   * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
-   * @param upperBounds map of state to upper bound
-   * @param liveParticipantSet set of live participant ids
-   * @param stateModelDef
-   * @param participantPreferenceList
-   * @param currentStateMap
-   *          : participant->state for each partition
-   * @param disabledParticipantsForPartition
-   * @return
+   * Get a mapping for a partition for the current state participants who have been dropped or
+   * disabled for a given partition.
+   * @param currentStateMap current map of participant id to state for a partition
+   * @param participants participants selected to serve the partition
+   * @param disabledParticipants participants that have been disabled for this partition
+   * @param initialState the initial state of the resource state model
+   * @return map of participant id to state of dropped and disabled partitions
    */
-  public static Map<ParticipantId, State> computeAutoBestStateForPartition(
-      Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
-      StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
-      Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
+  public static Map<ParticipantId, State> dropAndDisablePartitions(
+      Map<ParticipantId, State> currentStateMap, Collection<ParticipantId> participants,
+      Set<ParticipantId> disabledParticipants, State initialState) {
     Map<ParticipantId, State> participantStateMap = new HashMap<ParticipantId, State>();
-
     // if the resource is deleted, instancePreferenceList will be empty and
     // we should drop all resources.
     if (currentStateMap != null) {
       for (ParticipantId participantId : currentStateMap.keySet()) {
-        if ((participantPreferenceList == null || !participantPreferenceList
-            .contains(participantId)) && !disabledParticipantsForPartition.contains(participantId)) {
+        if ((participants == null || !participants.contains(participantId))
+            && !disabledParticipants.contains(participantId)) {
           // if dropped and not disabled, transit to DROPPED
           participantStateMap.put(participantId, State.from(HelixDefinedState.DROPPED));
         } else if ((currentStateMap.get(participantId) == null || !currentStateMap.get(
             participantId).equals(State.from(HelixDefinedState.ERROR)))
-            && disabledParticipantsForPartition.contains(participantId)) {
+            && disabledParticipants.contains(participantId)) {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          participantStateMap.put(participantId, stateModelDef.getTypedInitialState());
+          participantStateMap.put(participantId, initialState);
         }
       }
     }
+    return participantStateMap;
+  }
+
+  /**
+   * compute best state for resource in SEMI_AUTO and FULL_AUTO modes
+   * @param upperBounds map of state to upper bound
+   * @param liveParticipantSet set of live participant ids
+   * @param stateModelDef
+   * @param participantPreferenceList
+   * @param currentStateMap
+   *          : participant->state for each partition
+   * @param disabledParticipantsForPartition
+   * @return
+   */
+  public static Map<ParticipantId, State> computeAutoBestStateForPartition(
+      Map<State, String> upperBounds, Set<ParticipantId> liveParticipantSet,
+      StateModelDefinition stateModelDef, List<ParticipantId> participantPreferenceList,
+      Map<ParticipantId, State> currentStateMap, Set<ParticipantId> disabledParticipantsForPartition) {
+    // drop and disable participants if necessary
+    Map<ParticipantId, State> participantStateMap =
+        dropAndDisablePartitions(currentStateMap, participantPreferenceList,
+            disabledParticipantsForPartition, stateModelDef.getTypedInitialState());
 
     // resource is deleted
     if (participantPreferenceList == null) {