You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/08/04 22:19:19 UTC

[2/7] [HELIX-376] Remove HelixConnection/HelixManager duplicate code

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
index 04c0352..495d37c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
@@ -25,11 +25,9 @@ import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.testutil.ZkTestBase;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -54,9 +52,9 @@ public class TestStartMultipleControllersWithSameName extends ZkTestBase {
     // rebalance
 
     // start controller
-    ClusterControllerManager[] controllers = new ClusterControllerManager[4];
+    MockController[] controllers = new MockController[4];
     for (int i = 0; i < 4; i++) {
-      controllers[i] = new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+      controllers[i] = new MockController(_zkaddr, clusterName, "controller_0");
       controllers[i].syncStart();
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index e5ff171..c4304b0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -32,8 +32,8 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
 import org.apache.helix.mock.participant.MockMSStateModel;
 import org.apache.helix.mock.participant.MockTransition;
@@ -172,7 +172,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
   @Test
   public void testStateTransitionTimeOut() throws Exception {
     Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
-    // MockParticipantManager[] participants = new MockParticipantManager[NODE_NR];
+    // MockParticipant[] participants = new MockParticipant[NODE_NR];
     IdealState idealState =
         _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
     for (int i = 0; i < NODE_NR; i++) {
@@ -185,12 +185,12 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
         }
       }
 
-      _participants[i] = new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName);
+      _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
       _participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", factory);
       _participants[i].syncStart();
     }
     String controllerName = "controller_0";
-    _controller = new ClusterControllerManager(_zkaddr, CLUSTER_NAME, controllerName);
+    _controller = new MockController(_zkaddr, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
     boolean result =

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
index 283055c..1927d72 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
@@ -22,8 +22,7 @@ package org.apache.helix.integration;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.MockParticipant;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
@@ -47,7 +46,6 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
     idealStateOld2.merge(is2.getRecord());
 
     String instanceName = "localhost_" + (START_PORT + 0);
-    ZKHelixAdmin tool = new ZKHelixAdmin(_zkclient);
     _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
 
     boolean result =
@@ -77,8 +75,8 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
       exception = true;
     }
     Assert.assertFalse(exception);
-    MockParticipantManager newParticipant =
-        new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName2);
+    MockParticipant newParticipant =
+        new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName2);
     newParticipant.syncStart();
 
     result =

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 111c4d2..9e8fd85 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -31,11 +31,8 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.integration.manager.ZkTestManager;
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.testutil.ZkTestBase;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -63,16 +60,16 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    final ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+    final MockController controller =
+        new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -81,7 +78,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
             .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
                 clusterName));
     Assert.assertTrue(result);
-    final MockParticipantManager participantManagerToExpire = participants[1];
+    final MockParticipant participantManagerToExpire = participants[1];
 
     // check controller zk-watchers
     result = TestHelper.verify(new TestHelper.Verifier() {
@@ -116,12 +113,12 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     Assert.assertTrue(result, "Participant should have 1 zk-watcher. MESSAGES->HelixTaskExecutor");
 
     // check HelixManager#_handlers
-    // printHandlers(controllerManager);
-    // printHandlers(participantManagerToExpire);
+    TestHelper.printHandlers(controller, controller.getHandlers());
+    TestHelper.printHandlers(participantManagerToExpire, participantManagerToExpire.getHandlers());
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManagerToExpire.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, 9,
-        "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
+    Assert.assertEquals(controllerHandlerNb, 10,
+        "HelixController should have 10 (6+2n) callback handlers for 2 (n) participant");
     Assert.assertEquals(particHandlerNb, 1,
         "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
 
@@ -208,16 +205,16 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    final ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+    final MockController controller =
+        new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -228,7 +225,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     Assert.assertTrue(result);
     // final ZkHelixTestManager controllerManager = controller.getManager();
     // final ZkHelixTestManager participantManager = participants[0].getManager();
-    final MockParticipantManager participantManager = participants[0];
+    final MockParticipant participantManager = participants[0];
 
     // wait until we get all the listeners registered
     result = TestHelper.verify(new TestHelper.Verifier() {
@@ -237,7 +234,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
       public boolean verify() throws Exception {
         int controllerHandlerNb = controller.getHandlers().size();
         int particHandlerNb = participantManager.getHandlers().size();
-        if (controllerHandlerNb == 9 && particHandlerNb == 2)
+        if (controllerHandlerNb == 10 && particHandlerNb == 2)
           return true;
         else
           return false;
@@ -246,12 +243,14 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
 
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManager.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, 9,
-        "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
-            + controllerHandlerNb + ", " + printHandlers(controller));
+    TestHelper.printHandlers(controller, controller.getHandlers());
+    Assert.assertEquals(controllerHandlerNb, 10,
+        "HelixController should have 10 (6+2n) callback handlers for 2 participant, but was "
+            + controllerHandlerNb);
+    TestHelper.printHandlers(participantManager, participantManager.getHandlers());
     Assert.assertEquals(particHandlerNb, 1,
         "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
-            + particHandlerNb + ", " + printHandlers(participantManager));
+            + particHandlerNb);
 
     // expire controller
     System.out.println("Expiring controller session...");
@@ -301,13 +300,13 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     // check HelixManager#_handlers
     // printHandlers(controllerManager);
     int handlerNb = controller.getHandlers().size();
+    TestHelper.printHandlers(controller, controller.getHandlers());
     Assert.assertEquals(handlerNb, controllerHandlerNb,
-        "controller callback handlers should not increase after participant session expiry, but was "
-            + printHandlers(controller));
+        "controller callback handlers should not increase after participant session expiry");
     handlerNb = participantManager.getHandlers().size();
+    TestHelper.printHandlers(participantManager, participantManager.getHandlers());
     Assert.assertEquals(handlerNb, particHandlerNb,
-        "participant callback handlers should not increase after participant session expiry, but was "
-            + printHandlers(participantManager));
+        "participant callback handlers should not increase after participant session expiry");
 
     // clean up
     controller.syncStop();
@@ -333,20 +332,19 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
         2, // replicas
         "MasterSlave", true);
 
-    final ClusterControllerManager controller =
-        new ClusterControllerManager(zkAddr, clusterName, "controller_0");
+    final MockController controller = new MockController(zkAddr, clusterName, "controller_0");
     controller.syncStart();
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipantManager(zkAddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(zkAddr, clusterName, instanceName);
       participants[i].syncStart();
 
       // register a controller listener on participant_0
       if (i == 0) {
         // ZkHelixTestManager manager = participants[0].getManager();
-        MockParticipantManager manager = participants[0];
+        MockParticipant manager = participants[0];
         manager.addCurrentStateChangeListener(new CurrentStateChangeListener() {
           @Override
           public void onStateChange(String instanceName, List<CurrentState> statesInfo,
@@ -365,7 +363,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
                 clusterName));
     Assert.assertTrue(result);
 
-    MockParticipantManager participantToExpire = participants[0];
+    MockParticipant participantToExpire = participants[0];
     String oldSessionId = participantToExpire.getSessionId();
     PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
 
@@ -446,8 +444,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     Assert.assertEquals(childListeners.get(path).size(), 1,
         "Should have 1 child-listener on path: " + path);
     path = keyBuilder.controller().getPath();
-    Assert.assertNull(childListeners.get(path),
-        "Should have no child-listener on path: " + path);
+    Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
 
     // check zookeeper#watches on client side
     watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
@@ -494,51 +491,4 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
-
-  // debug code
-  static String printHandlers(ZkTestManager manager) {
-    StringBuilder sb = new StringBuilder();
-    List<CallbackHandler> handlers = manager.getHandlers();
-    sb.append(manager.getInstanceName() + " has " + handlers.size() + " cb-handlers. [");
-
-    for (int i = 0; i < handlers.size(); i++) {
-      CallbackHandler handler = handlers.get(i);
-      String path = handler.getPath();
-      sb.append(path.substring(manager.getClusterName().length() + 1) + ": "
-          + handler.getListener());
-      if (i < (handlers.size() - 1)) {
-        sb.append(", ");
-      }
-    }
-    sb.append("]");
-
-    return sb.toString();
-  }
-
-  void printZkListeners(ZkClient client) throws Exception {
-    Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
-    Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
-
-    System.out.println("dataListeners {");
-    for (String path : datalisteners.keySet()) {
-      System.out.println("\t" + path + ": ");
-      Set<IZkDataListener> set = datalisteners.get(path);
-      for (IZkDataListener listener : set) {
-        CallbackHandler handler = (CallbackHandler) listener;
-        System.out.println("\t\t" + handler.getListener());
-      }
-    }
-    System.out.println("}");
-
-    System.out.println("childListeners {");
-    for (String path : childListeners.keySet()) {
-      System.out.println("\t" + path + ": ");
-      Set<IZkChildListener> set = childListeners.get(path);
-      for (IZkChildListener listener : set) {
-        CallbackHandler handler = (CallbackHandler) listener;
-        System.out.println("\t\t" + handler.getListener());
-      }
-    }
-    System.out.println("}");
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
index 8b4e889..d1ae897 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
@@ -30,8 +30,8 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
@@ -110,17 +110,17 @@ public class TestZkSessionExpiry extends ZkTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+    MockController controller =
+        new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     Set<String> handledMsgSet = new HashSet<String>();
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
       participants[i].getMessagingService().registerMessageHandlerFactory(DUMMY_MSG_TYPE,
           new DummyMessageHandlerFactory(handledMsgSet));
       participants[i].syncStart();

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
index 8eaf2e7..3098b5c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -21,8 +21,8 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.testutil.ZkTestBase;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -49,8 +49,8 @@ public class ZkStandAloneCMTestBase extends ZkTestBase {
   protected final String CLASS_NAME = this.getClass().getSimpleName();
   protected final String CLUSTER_NAME = CLASS_NAME;
 
-  protected MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
-  protected ClusterControllerManager _controller;
+  protected MockParticipant[] _participants = new MockParticipant[NODE_NR];
+  protected MockController _controller;
 
   int _replica = 3;
 
@@ -76,13 +76,13 @@ public class ZkStandAloneCMTestBase extends ZkTestBase {
     // start dummy participants
     for (int i = 0; i < NODE_NR; i++) {
       String instanceName = "localhost_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName);
+      _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
       _participants[i].syncStart();
     }
 
     // start controller
     String controllerName = "controller_0";
-    _controller = new ClusterControllerManager(_zkaddr, CLUSTER_NAME, controllerName);
+    _controller = new MockController(_zkaddr, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
     boolean result =

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
deleted file mode 100644
index b8f0f2b..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.helix.integration.manager;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.log4j.Logger;
-
-public class ClusterControllerManager extends ZKHelixManager implements Runnable, ZkTestManager {
-  private static Logger LOG = Logger.getLogger(ClusterControllerManager.class);
-
-  private final CountDownLatch _startCountDown = new CountDownLatch(1);
-  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
-  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-
-  public ClusterControllerManager(String zkAddr, String clusterName, String controllerName) {
-    super(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
-  }
-
-  public void syncStop() {
-    _stopCountDown.countDown();
-    try {
-      _waitStopFinishCountDown.await();
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted waiting for finish", e);
-    }
-  }
-
-  public void syncStart() {
-    // TODO: prevent start multiple times
-    new Thread(this).start();
-    try {
-      _startCountDown.await();
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted waiting for start", e);
-    }
-  }
-
-  @Override
-  public void run() {
-    try {
-      connect();
-      _startCountDown.countDown();
-      _stopCountDown.await();
-    } catch (Exception e) {
-      LOG.error("exception running controller-manager", e);
-    } finally {
-      _startCountDown.countDown();
-      disconnect();
-      _waitStopFinishCountDown.countDown();
-    }
-  }
-
-  @Override
-  public ZkClient getZkClient() {
-    return _zkclient;
-  }
-
-  @Override
-  public List<CallbackHandler> getHandlers() {
-    return _handlers;
-  }
-
-  public List<HelixTimerTask> getControllerTimerTasks() {
-    return _controllerTimerTasks;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
deleted file mode 100644
index a17ccc1..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.helix.integration.manager;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.participant.DistClusterControllerStateModelFactory;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-
-public class ClusterDistributedController extends ZKHelixManager implements Runnable, ZkTestManager {
-  private static Logger LOG = Logger.getLogger(ClusterDistributedController.class);
-
-  private final CountDownLatch _startCountDown = new CountDownLatch(1);
-  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
-  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-
-  public ClusterDistributedController(String zkAddr, String clusterName, String controllerName) {
-    super(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkAddr);
-  }
-
-  public void syncStop() {
-    _stopCountDown.countDown();
-    try {
-      _waitStopFinishCountDown.await();
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted waiting for finish", e);
-    }
-  }
-
-  public void syncStart() {
-    // TODO: prevent start multiple times
-    new Thread(this).start();
-    try {
-      _startCountDown.await();
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted waiting for start", e);
-    }
-  }
-
-  @Override
-  public void run() {
-    try {
-      StateMachineEngine stateMach = getStateMachineEngine();
-      DistClusterControllerStateModelFactory lsModelFactory =
-          new DistClusterControllerStateModelFactory(_zkAddress);
-      stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
-
-      connect();
-      _startCountDown.countDown();
-      _stopCountDown.await();
-    } catch (Exception e) {
-      LOG.error("exception running controller-manager", e);
-    } finally {
-      _startCountDown.countDown();
-      disconnect();
-      _waitStopFinishCountDown.countDown();
-    }
-  }
-
-  @Override
-  public ZkClient getZkClient() {
-    return _zkclient;
-  }
-
-  @Override
-  public List<CallbackHandler> getHandlers() {
-    return _handlers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
deleted file mode 100644
index 917be17..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.helix.integration.manager;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
-import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
-import org.apache.helix.mock.participant.MockMSModelFactory;
-import org.apache.helix.mock.participant.MockSchemataModelFactory;
-import org.apache.helix.mock.participant.MockTransition;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-
-public class MockParticipantManager extends ZKHelixManager implements Runnable, ZkTestManager {
-  private static Logger LOG = Logger.getLogger(MockParticipantManager.class);
-
-  private final CountDownLatch _startCountDown = new CountDownLatch(1);
-  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
-  private final CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1);
-
-  private final MockMSModelFactory _msModelFactory = new MockMSModelFactory(null);
-
-  public MockParticipantManager(String zkAddr, String clusterName, String instanceName) {
-    super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
-  }
-
-  public void setTransition(MockTransition transition) {
-    _msModelFactory.setTrasition(transition);
-  }
-
-  public void syncStop() {
-    _stopCountDown.countDown();
-    try {
-      _waitStopCompleteCountDown.await();
-    } catch (InterruptedException e) {
-      LOG.error("exception in syncStop participant-manager", e);
-    }
-  }
-
-  public void syncStart() {
-    try {
-      new Thread(this).start();
-      _startCountDown.await();
-    } catch (InterruptedException e) {
-      LOG.error("exception in syncStart participant-manager", e);
-    }
-  }
-
-  @Override
-  public void run() {
-    try {
-      StateMachineEngine stateMach = getStateMachineEngine();
-      stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
-
-      DummyLeaderStandbyStateModelFactory lsModelFactory =
-          new DummyLeaderStandbyStateModelFactory(10);
-      DummyOnlineOfflineStateModelFactory ofModelFactory =
-          new DummyOnlineOfflineStateModelFactory(10);
-      stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
-      stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
-
-      MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
-      stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
-
-      connect();
-      _startCountDown.countDown();
-
-      _stopCountDown.await();
-    } catch (InterruptedException e) {
-      String msg =
-          "participant: " + getInstanceName() + ", " + Thread.currentThread().getName()
-              + " is interrupted";
-      LOG.info(msg);
-    } catch (Exception e) {
-      LOG.error("exception running participant-manager", e);
-    } finally {
-      _startCountDown.countDown();
-
-      disconnect();
-      _waitStopCompleteCountDown.countDown();
-    }
-  }
-
-  @Override
-  public ZkClient getZkClient() {
-    return _zkclient;
-  }
-
-  @Override
-  public List<CallbackHandler> getHandlers() {
-    return _handlers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
index 877cf3c..99986ef 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
@@ -24,13 +24,15 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.PreConnectCallback;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockMultiClusterController;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkCallbackHandler;
 import org.apache.helix.mock.participant.MockMSModelFactory;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.testutil.HelixTestUtil;
@@ -97,19 +99,18 @@ public class TestConsecutiveZkSessionExpiry extends ZkTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    final ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller");
+    final MockController controller = new MockController(_zkaddr, clusterName, "controller");
     controller.syncStart();
 
     // start participants
     CountDownLatch startCountdown = new CountDownLatch(1);
     CountDownLatch endCountdown = new CountDownLatch(1);
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
     for (int i = 0; i < n; i++) {
       final String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
 
       if (i == 0) {
         participants[i].addPreConnectCallback(new PreConnectTestCallback(instanceName,
@@ -178,14 +179,13 @@ public class TestConsecutiveZkSessionExpiry extends ZkTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    ClusterDistributedController[] distributedControllers = new ClusterDistributedController[n];
+    MockMultiClusterController[] distributedControllers = new MockMultiClusterController[n];
     CountDownLatch startCountdown = new CountDownLatch(1);
     CountDownLatch endCountdown = new CountDownLatch(1);
 
     for (int i = 0; i < n; i++) {
       String contrllerName = "localhost_" + (12918 + i);
-      distributedControllers[i] =
-          new ClusterDistributedController(_zkaddr, clusterName, contrllerName);
+      distributedControllers[i] = new MockMultiClusterController(_zkaddr, clusterName, contrllerName);
       distributedControllers[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
           new MockMSModelFactory());
       if (i == 0) {
@@ -237,12 +237,15 @@ public class TestConsecutiveZkSessionExpiry extends ZkTestBase {
     Assert.assertNotNull(leader);
     Assert.assertEquals(leader.getId(), "localhost_12919");
 
-    // check localhost_12918 has 2 handlers: message and data-accessor
-    LOG.debug("handlers: " + TestHelper.printHandlers(distributedControllers[0]));
-    List<CallbackHandler> handlers = distributedControllers[0].getHandlers();
-    Assert.assertEquals(handlers.size(), 1,
-        "Distributed controller should have 1 handler (message) after lose leadership, but was "
-            + handlers.size());
+    // check localhost_12918 has 2 handlers: message and leader-election
+    TestHelper.printHandlers(distributedControllers[0], distributedControllers[0].getHandlers());
+    List<ZkCallbackHandler> handlers = distributedControllers[0].getHandlers();
+    Assert
+        .assertEquals(
+            handlers.size(),
+            2,
+            "Distributed controller should have 2 handler (message and leader election) after lose leadership, but was "
+                + handlers.size());
 
     // clean up
     distributedControllers[0].disconnect();

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/TestControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestControllerManager.java
index 1544dc8..69d1dbe 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestControllerManager.java
@@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.testutil.ZkTestBase;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -53,10 +55,10 @@ public class TestControllerManager extends ZkTestBase {
 
     // start multiple controllers of same name
     int m = 3;
-    ClusterControllerManager[] controllers = new ClusterControllerManager[m];
+    MockController[] controllers = new MockController[m];
 
     for (int i = 0; i < m; i++) {
-      controllers[i] = new ClusterControllerManager(_zkaddr, clusterName, "controller");
+      controllers[i] = new MockController(_zkaddr, clusterName, "controller");
       controllers[i].syncStart();
     }
 
@@ -69,11 +71,11 @@ public class TestControllerManager extends ZkTestBase {
     }
     Assert.assertEquals(leaderCnt, 1, "Should have only 1 leader but was " + leaderCnt);
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
     for (int i = 0; i < n; i++) {
       final String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
 
       participants[i].syncStart();
     }
@@ -107,7 +109,7 @@ public class TestControllerManager extends ZkTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
 
     TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
         "localhost", // participant name prefix
@@ -119,14 +121,14 @@ public class TestControllerManager extends ZkTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller");
+    MockController controller =
+        new MockController(_zkaddr, clusterName, "controller");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
       participants[i].syncStart();
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
index f915c4f..18234b5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
@@ -27,9 +27,10 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.MockMultiClusterController;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkCallbackHandler;
 import org.apache.helix.mock.participant.MockMSModelFactory;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.testutil.ZkTestBase;
@@ -109,8 +110,8 @@ public class TestDistributedControllerManager extends ZkTestBase {
    * @param newController
    * @throws Exception
    */
-  void expireController(ClusterDistributedController expireController,
-      ClusterDistributedController newController) throws Exception {
+  void expireController(MockMultiClusterController expireController,
+      MockMultiClusterController newController) throws Exception {
     String clusterName = expireController.getClusterName();
     LOG.info("Expiring distributedController: " + expireController.getInstanceName()
         + ", session: " + expireController.getSessionId() + " ...");
@@ -136,13 +137,12 @@ public class TestDistributedControllerManager extends ZkTestBase {
     Assert.assertNotNull(leader);
     Assert.assertEquals(leader.getId(), newController.getInstanceName());
 
-    // check expired-controller has 2 handlers: message and data-accessor
-    LOG.debug(expireController.getInstanceName() + " handlers: "
-        + TestHelper.printHandlers(expireController));
+    // check expired-controller has 2 handlers: message and leader-election
+    TestHelper.printHandlers(expireController, expireController.getHandlers());
 
-    List<CallbackHandler> handlers = expireController.getHandlers();
-    Assert.assertEquals(handlers.size(), 1,
-        "Distributed controller should have 1 handler (message) after lose leadership, but was "
+    List<ZkCallbackHandler> handlers = expireController.getHandlers();
+    Assert.assertEquals(handlers.size(), 2,
+        "Distributed controller should have 2 handler (message and leader-election) after lose leadership, but was "
             + handlers.size());
   }
 
@@ -165,12 +165,12 @@ public class TestDistributedControllerManager extends ZkTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    ClusterDistributedController[] distributedControllers = new ClusterDistributedController[n];
+    MockMultiClusterController[] distributedControllers = new MockMultiClusterController[n];
 
     for (int i = 0; i < n; i++) {
       String contrllerName = "localhost_" + (12918 + i);
       distributedControllers[i] =
-          new ClusterDistributedController(_zkaddr, clusterName, contrllerName);
+          new MockMultiClusterController(_zkaddr, clusterName, contrllerName);
       distributedControllers[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
           new MockMSModelFactory());
       distributedControllers[i].connect();

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index 4d46883..309ab18 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -34,6 +34,8 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -110,7 +112,7 @@ public class TestParticipantManager extends ZkTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
 
     TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
         "localhost", // participant name prefix
@@ -122,14 +124,14 @@ public class TestParticipantManager extends ZkTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+    MockController controller =
+        new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -197,7 +199,7 @@ public class TestParticipantManager extends ZkTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
 
     TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
         "localhost", // participant name prefix
@@ -209,14 +211,14 @@ public class TestParticipantManager extends ZkTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller_0");
+    MockController controller =
+        new MockController(_zkaddr, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
       participants[i].setTransition(new SessionExpiryTransition(startCountdown, endCountdown));
       participants[i].syncStart();
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
index b82f156..d6d7bab 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
@@ -28,6 +28,8 @@ import java.util.TreeMap;
 
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.mock.participant.ErrTransition;
 import org.apache.helix.participant.HelixStateMachineEngine;
@@ -70,15 +72,15 @@ public class TestStateModelLeak extends ZkTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller");
+    MockController controller =
+        new MockController(_zkaddr, clusterName, "controller");
     controller.syncStart();
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
     for (int i = 0; i < n; i++) {
       final String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -144,15 +146,15 @@ public class TestStateModelLeak extends ZkTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller");
+    MockController controller =
+        new MockController(_zkaddr, clusterName, "controller");
     controller.syncStart();
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
     for (int i = 0; i < n; i++) {
       final String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
       if (i == 0) {
         Map<String, Set<String>> errTransitionMap = new HashMap<String, Set<String>>();
         Set<String> partitions = new HashSet<String>();

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
index 650f13f..1393231 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
@@ -31,6 +31,8 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.testutil.ZkTestBase;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -61,16 +63,15 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    final ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller");
+    final MockController controller = new MockController(_zkaddr, clusterName, "controller");
     controller.connect();
 
     // start participants
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -98,7 +99,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers.");
 
     // check participant zk-watchers
-    final MockParticipantManager participantManagerToExpire = participants[0];
+    final MockParticipant participantManagerToExpire = participants[0];
     result = TestHelper.verify(new TestHelper.Verifier() {
 
       @Override
@@ -118,8 +119,8 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     // printHandlers(participantManagerToExpire);
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManagerToExpire.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, (5 + 2 * n),
-        "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant");
+    Assert.assertEquals(controllerHandlerNb, (6 + 2 * n),
+        "HelixController should have 10 (5+2n) callback handlers for 2 (n) participant");
     Assert.assertEquals(particHandlerNb, 1,
         "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers");
 
@@ -200,16 +201,15 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
         2, // replicas
         "MasterSlave", true); // do rebalance
 
-    final ClusterControllerManager controller =
-        new ClusterControllerManager(_zkaddr, clusterName, "controller");
+    final MockController controller = new MockController(_zkaddr, clusterName, "controller");
     controller.syncStart();
 
     // start participants
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipantManager(_zkaddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -220,14 +220,14 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     Assert.assertTrue(result);
 
     // wait until we get all the listeners registered
-    final MockParticipantManager participantManager = participants[0];
+    final MockParticipant participantManager = participants[0];
     result = TestHelper.verify(new TestHelper.Verifier() {
 
       @Override
       public boolean verify() throws Exception {
         int controllerHandlerNb = controller.getHandlers().size();
         int particHandlerNb = participantManager.getHandlers().size();
-        if (controllerHandlerNb == 9 && particHandlerNb == 2)
+        if (controllerHandlerNb == 10 && particHandlerNb == 2)
           return true;
         else
           return false;
@@ -236,12 +236,14 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
 
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManager.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, (5 + 2 * n),
-        "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was "
-            + controllerHandlerNb + ", " + TestHelper.printHandlers(controller));
+    TestHelper.printHandlers(controller, controller.getHandlers());
+    TestHelper.printHandlers(participantManager, participantManager.getHandlers());
+    Assert.assertEquals(controllerHandlerNb, (6 + 2 * n),
+        "HelixController should have 10 (6+2n) callback handlers for 2 participant, but was "
+            + controllerHandlerNb);
     Assert.assertEquals(particHandlerNb, 1,
         "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handler, but was "
-            + particHandlerNb + ", " + TestHelper.printHandlers(participantManager));
+            + particHandlerNb);
 
     // expire controller
     LOG.debug("Expiring controller session...");
@@ -292,13 +294,13 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     // check HelixManager#_handlers
     // printHandlers(controllerManager);
     int handlerNb = controller.getHandlers().size();
+    TestHelper.printHandlers(controller, controller.getHandlers());
     Assert.assertEquals(handlerNb, controllerHandlerNb,
-        "controller callback handlers should not increase after participant session expiry, but was "
-            + TestHelper.printHandlers(controller));
+        "controller callback handlers should not increase after participant session expiry");
     handlerNb = participantManager.getHandlers().size();
+    TestHelper.printHandlers(participantManager, participantManager.getHandlers());
     Assert.assertEquals(handlerNb, particHandlerNb,
-        "participant callback handlers should not increase after participant session expiry, but was "
-            + TestHelper.printHandlers(participantManager));
+        "participant callback handlers should not increase after participant session expiry");
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
@@ -318,19 +320,18 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
         2, // replicas
         "MasterSlave", true);
 
-    final ClusterControllerManager controller =
-        new ClusterControllerManager(zkAddr, clusterName, "controller");
+    final MockController controller = new MockController(zkAddr, clusterName, "controller");
     controller.syncStart();
 
-    MockParticipantManager[] participants = new MockParticipantManager[n];
+    MockParticipant[] participants = new MockParticipant[n];
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      participants[i] = new MockParticipantManager(zkAddr, clusterName, instanceName);
+      participants[i] = new MockParticipant(zkAddr, clusterName, instanceName);
       participants[i].syncStart();
 
       // register a controller listener on participant_0
       if (i == 0) {
-        MockParticipantManager manager = participants[0];
+        MockParticipant manager = participants[0];
         manager.addCurrentStateChangeListener(new CurrentStateChangeListener() {
           @Override
           public void onStateChange(String instanceName, List<CurrentState> statesInfo,
@@ -349,7 +350,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
                 clusterName));
     Assert.assertTrue(result);
 
-    MockParticipantManager participantToExpire = participants[0];
+    MockParticipant participantToExpire = participants[0];
     String oldSessionId = participantToExpire.getSessionId();
     PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
 
@@ -371,9 +372,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 data-listeners on path: "
         + path);
     Assert
-        .assertEquals(
-            childListeners.size(),
-            2,
+        .assertEquals(childListeners.size(), 2,
             "Should have 2 paths (CURRENTSTATE/{sessionId}, and MESSAGES) each of which has 1 child-listener");
     path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath();
     Assert.assertEquals(childListeners.get(path).size(), 1,
@@ -382,17 +381,14 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     Assert.assertEquals(childListeners.get(path).size(), 1,
         "Should have 1 child-listener on path: " + path);
     path = keyBuilder.controller().getPath();
-    Assert.assertNull(childListeners.get(path),
-        "Should have no child-listener on path: " + path);
+    Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
 
     // check zookeeper#watches on client side
     Map<String, List<String>> watchPaths =
         ZkTestHelper.getZkWatch(participantToExpire.getZkClient());
     LOG.debug("localhost_12918 zk-client side watchPaths: " + watchPaths);
     Assert
-        .assertEquals(
-            watchPaths.get("dataWatches").size(),
-            3,
+        .assertEquals(watchPaths.get("dataWatches").size(), 3,
             "Should have 3 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, MESSAGES");
     Assert.assertEquals(watchPaths.get("childWatches").size(), 2,
         "Should have 2 child-watches: MESSAGES, and CURRENTSTATE/{sessionId}");
@@ -435,8 +431,7 @@ public class TestZkCallbackHandlerLeak extends ZkTestBase {
     Assert.assertEquals(childListeners.get(path).size(), 1,
         "Should have 1 child-listener on path: " + path);
     path = keyBuilder.controller().getPath();
-    Assert.assertNull(childListeners.get(path),
-        "Should have no child-listener on path: " + path);
+    Assert.assertNull(childListeners.get(path), "Should have no child-listener on path: " + path);
 
     // check zookeeper#watches on client side
     watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient());

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 2e2c8b6..22beed1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -30,9 +30,9 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.ScheduleConfig;
@@ -63,8 +63,8 @@ public class TestIndependentTaskRebalancer extends ZkTestBase {
   private static final int n = 5;
   private static final int START_PORT = 12918;
   private final String CLUSTER_NAME = "TestIndependentTaskRebalancer";
-  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
-  private ClusterControllerManager _controller;
+  private final MockParticipant[] _participants = new MockParticipant[n];
+  private MockController _controller;
   private Set<String> _invokedClasses = Sets.newHashSet();
   private Map<String, Integer> _runCounts = Maps.newHashMap();
 
@@ -104,7 +104,7 @@ public class TestIndependentTaskRebalancer extends ZkTestBase {
         }
       });
 
-      _participants[i] = new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName);
+      _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
 
       // Register a Task state model factory.
       StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
@@ -115,7 +115,7 @@ public class TestIndependentTaskRebalancer extends ZkTestBase {
 
     // Start controller
     String controllerName = "controller_0";
-    _controller = new ClusterControllerManager(_zkaddr, CLUSTER_NAME, controllerName);
+    _controller = new MockController(_zkaddr, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
     // Start an admin connection

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index e39615d..84c0e1d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -30,8 +30,8 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
@@ -66,8 +66,8 @@ public class TestTaskRebalancer extends ZkTestBase {
   private static final int NUM_PARTITIONS = 20;
   private static final int NUM_REPLICAS = 3;
   private final String CLUSTER_NAME = "TestTaskRebalancer";
-  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
-  private ClusterControllerManager _controller;
+  private final MockParticipant[] _participants = new MockParticipant[n];
+  private MockController _controller;
 
   private HelixManager _manager;
   private TaskDriver _driver;
@@ -102,7 +102,7 @@ public class TestTaskRebalancer extends ZkTestBase {
     // start dummy participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName);
+      _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
 
       // Register a Task state model factory.
       StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
@@ -113,7 +113,7 @@ public class TestTaskRebalancer extends ZkTestBase {
 
     // start controller
     String controllerName = "controller_0";
-    _controller = new ClusterControllerManager(_zkaddr, CLUSTER_NAME, controllerName);
+    _controller = new MockController(_zkaddr, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
     // create cluster manager

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 6de361d..e3e8b89 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -27,8 +27,8 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.MockController;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.Task;
@@ -60,8 +60,8 @@ public class TestTaskRebalancerStopResume extends ZkTestBase {
   private static final int NUM_PARTITIONS = 20;
   private static final int NUM_REPLICAS = 3;
   private final String CLUSTER_NAME = "TestTaskRebalancerStopResume";
-  private final MockParticipantManager[] _participants = new MockParticipantManager[n];
-  private ClusterControllerManager _controller;
+  private final MockParticipant[] _participants = new MockParticipant[n];
+  private MockController _controller;
 
   private HelixManager _manager;
   private TaskDriver _driver;
@@ -94,7 +94,7 @@ public class TestTaskRebalancerStopResume extends ZkTestBase {
     // start dummy participants
     for (int i = 0; i < n; i++) {
       String instanceName = "localhost_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName);
+      _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
 
       // Register a Task state model factory.
       StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
@@ -106,7 +106,7 @@ public class TestTaskRebalancerStopResume extends ZkTestBase {
 
     // start controller
     String controllerName = "controller_0";
-    _controller = new ClusterControllerManager(_zkaddr, CLUSTER_NAME, controllerName);
+    _controller = new MockController(_zkaddr, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
     // create cluster manager

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/MockListener.java b/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
index 376481e..d6edf17 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/MockListener.java
@@ -21,11 +21,11 @@ package org.apache.helix.manager;
 
 import java.util.List;
 
-import org.apache.helix.ConfigChangeListener;
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceConfigChangeListener;
 import org.apache.helix.LiveInstanceChangeListener;
 import org.apache.helix.MessageListener;
 import org.apache.helix.NotificationContext;
@@ -37,7 +37,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 
 public class MockListener implements IdealStateChangeListener, LiveInstanceChangeListener,
-    ConfigChangeListener, CurrentStateChangeListener, ExternalViewChangeListener,
+    InstanceConfigChangeListener, CurrentStateChangeListener, ExternalViewChangeListener,
     ControllerChangeListener, MessageListener
 
 {
@@ -71,7 +71,7 @@ public class MockListener implements IdealStateChangeListener, LiveInstanceChang
   }
 
   @Override
-  public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
+  public void onInstanceConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
     isConfigChangeListenerInvoked = true;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/MockController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/MockController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/MockController.java
new file mode 100644
index 0000000..f4d2159
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/MockController.java
@@ -0,0 +1,86 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.InstanceType;
+import org.apache.log4j.Logger;
+
+public class MockController extends ZKHelixManager implements Runnable {
+  private static Logger LOG = Logger.getLogger(MockController.class);
+
+  private final CountDownLatch _startCountDown = new CountDownLatch(1);
+  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
+  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
+
+  public MockController(String zkAddr, String clusterName, String controllerName) {
+    super(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
+  }
+
+  public void syncStop() {
+    _stopCountDown.countDown();
+    try {
+      _waitStopFinishCountDown.await();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted waiting for finish", e);
+    }
+  }
+
+  public void syncStart() {
+    // TODO: prevent start multiple times
+    new Thread(this).start();
+    try {
+      _startCountDown.await();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted waiting for start", e);
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      connect();
+      _startCountDown.countDown();
+      _stopCountDown.await();
+    } catch (Exception e) {
+      LOG.error("exception running controller-manager", e);
+    } finally {
+      _startCountDown.countDown();
+      disconnect();
+      _waitStopFinishCountDown.countDown();
+    }
+  }
+
+  public ZkClient getZkClient() {
+    ZkHelixConnection conn = (ZkHelixConnection)getConn();
+    return conn._zkclient;
+  }
+
+  public ZkHelixConnection getConn() {
+    return (ZkHelixConnection)_role.getConnection();
+  }
+
+  public List<ZkCallbackHandler> getHandlers() {
+    ZkHelixConnection conn = (ZkHelixConnection)getConn();
+    return conn._handlers.get(_role);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java
new file mode 100644
index 0000000..7f8b1a3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/MockMultiClusterController.java
@@ -0,0 +1,99 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.participant.DistClusterControllerStateModelFactory;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+public class MockMultiClusterController extends ZKHelixManager implements Runnable {
+  private static Logger LOG = Logger.getLogger(MockMultiClusterController.class);
+
+  private final CountDownLatch _startCountDown = new CountDownLatch(1);
+  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
+  private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
+
+  public MockMultiClusterController(String zkAddr, String clusterName, String controllerName) {
+    super(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkAddr);
+  }
+
+  public void syncStop() {
+    _stopCountDown.countDown();
+    try {
+      _waitStopFinishCountDown.await();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted waiting for finish", e);
+    }
+  }
+
+  public void syncStart() {
+    // TODO: prevent start multiple times
+    new Thread(this).start();
+    try {
+      _startCountDown.await();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted waiting for start", e);
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      StateMachineEngine stateMach = getStateMachineEngine();
+      DistClusterControllerStateModelFactory lsModelFactory =
+          new DistClusterControllerStateModelFactory(_zkAddress);
+      stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
+
+      connect();
+      _startCountDown.countDown();
+      _stopCountDown.await();
+    } catch (Exception e) {
+      LOG.error("exception running controller-manager", e);
+    } finally {
+      _startCountDown.countDown();
+      disconnect();
+      _waitStopFinishCountDown.countDown();
+    }
+  }
+
+  public ZkHelixConnection getConn() {
+    return (ZkHelixConnection)_role.getConnection();
+  }
+
+  public ZkClient getZkClient() {
+    ZkHelixConnection conn = (ZkHelixConnection)getConn();
+    return conn._zkclient;
+  }
+
+  public List<ZkCallbackHandler> getHandlers() {
+    ZkHelixConnection conn = (ZkHelixConnection)getConn();
+    List<ZkCallbackHandler> handlers = new ArrayList<ZkCallbackHandler>();
+    for (List<ZkCallbackHandler> handlerList : conn._handlers.values()) {
+      handlers.addAll(handlerList);
+    }
+
+    return handlers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java b/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java
new file mode 100644
index 0000000..f107d3d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/MockParticipant.java
@@ -0,0 +1,119 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.InstanceType;
+import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
+import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
+import org.apache.helix.mock.participant.MockMSModelFactory;
+import org.apache.helix.mock.participant.MockSchemataModelFactory;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+public class MockParticipant extends ZKHelixManager implements Runnable {
+  private static Logger LOG = Logger.getLogger(MockParticipant.class);
+
+  private final CountDownLatch _startCountDown = new CountDownLatch(1);
+  private final CountDownLatch _stopCountDown = new CountDownLatch(1);
+  private final CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1);
+
+  private final MockMSModelFactory _msModelFactory = new MockMSModelFactory(null);
+
+
+  public MockParticipant(String zkAddress, String clusterName, String instanceName) {
+    super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddress);
+  }
+
+  public void setTransition(MockTransition transition) {
+    _msModelFactory.setTrasition(transition);
+  }
+
+  public void syncStop() {
+    _stopCountDown.countDown();
+    try {
+      _waitStopCompleteCountDown.await();
+    } catch (InterruptedException e) {
+      LOG.error("exception in syncStop participant-manager", e);
+    }
+  }
+
+  public void syncStart() {
+    try {
+      new Thread(this).start();
+      _startCountDown.await();
+    } catch (InterruptedException e) {
+      LOG.error("exception in syncStart participant-manager", e);
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      StateMachineEngine stateMach = getStateMachineEngine();
+      stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
+
+      DummyLeaderStandbyStateModelFactory lsModelFactory =
+          new DummyLeaderStandbyStateModelFactory(10);
+      DummyOnlineOfflineStateModelFactory ofModelFactory =
+          new DummyOnlineOfflineStateModelFactory(10);
+      stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
+      stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
+
+      MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
+      stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
+
+      connect();
+      _startCountDown.countDown();
+
+      _stopCountDown.await();
+    } catch (InterruptedException e) {
+      String msg =
+          "participant: " + getInstanceName() + ", " + Thread.currentThread().getName()
+              + " is interrupted";
+      LOG.info(msg);
+    } catch (Exception e) {
+      LOG.error("exception running participant-manager", e);
+    } finally {
+      _startCountDown.countDown();
+
+      disconnect();
+      _waitStopCompleteCountDown.countDown();
+    }
+  }
+
+  public HelixConnection getConn() {
+    return _role.getConnection();
+  }
+
+  public ZkClient getZkClient() {
+    ZkHelixConnection conn = (ZkHelixConnection)getConn();
+    return conn._zkclient;
+  }
+
+  public List<ZkCallbackHandler> getHandlers() {
+    ZkHelixConnection conn = (ZkHelixConnection)getConn();
+    return conn._handlers.get(_role);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
index ca0d4ab..3b5b24e 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
@@ -23,7 +23,6 @@ import java.util.Date;
 
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.testutil.ZkTestBase;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -47,8 +46,8 @@ public class TestHandleNewSession extends ZkTestBase {
         3, // replicas
         "MasterSlave", true); // do rebalance
 
-    MockParticipantManager participant =
-        new MockParticipantManager(_zkaddr, clusterName, "localhost_12918");
+    MockParticipant participant =
+        new MockParticipant(_zkaddr, clusterName, "localhost_12918");
     participant.syncStart();
 
     // Logger.getRootLogger().setLevel(Level.INFO);

http://git-wip-us.apache.org/repos/asf/helix/blob/02165c52/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
index e59dd0c..7e2f4a7 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
@@ -20,7 +20,6 @@ package org.apache.helix.manager.zk;
  */
 
 import org.apache.helix.integration.ZkStandAloneCMTestBase;
-import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -40,7 +39,7 @@ public class TestLiveInstanceBounce extends ZkStandAloneCMTestBase {
         e.printStackTrace();
       }
       // restart the participant
-      _participants[i] = new MockParticipantManager(_zkaddr, CLUSTER_NAME, instanceName);
+      _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
       _participants[i].syncStart();
       Thread.sleep(100);
     }