You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/11/11 22:09:57 UTC

[03/10] [HELIX-279] Apply gc handling fixes to ZKHelixManager

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 8558b18..30f5807 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration;
  * under the License.
  */
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -28,20 +29,19 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.SessionId;
+import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.messaging.handling.HelixTaskResult;
@@ -52,8 +52,13 @@ import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.monitoring.ZKPathDataDumpTask;
 import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
 import org.testng.Assert;
@@ -70,6 +75,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     @Override
     public void onTimeOut() {
       // TODO Auto-generated method stub
+
     }
 
     @Override
@@ -78,20 +84,13 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     }
   }
 
-  TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
+  final TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
 
   public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
-    int cnt;
     public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
 
-    public TestMessagingHandlerFactory() {
-      super();
-      cnt = 0;
-    }
-
     @Override
     public MessageHandler createHandler(Message message, NotificationContext context) {
-      // System.out.println("\t create-hdlr: " + message.getId());
       return new TestMessagingHandler(message, context);
     }
 
@@ -116,20 +115,73 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
       public HelixTaskResult handleMessage() throws InterruptedException {
         HelixTaskResult result = new HelixTaskResult();
         result.setSuccess(true);
-        // String tgtName = _message.getTgtName();
-        String messageId = _message.getMessageId().stringify();
-        String partitionId = _message.getPartitionId().stringify();
+        String destName = _message.getTgtName();
+        String partitionName = _message.getPartitionName();
+        result.getTaskResultMap().put("Message", _message.getMsgId());
+        synchronized (_results) {
+          if (!_results.containsKey(partitionName)) {
+            _results.put(partitionName, new HashSet<String>());
+          }
+          _results.get(partitionName).add(_message.getMsgId());
+        }
+        // System.err.println("handle msg: " + _message.getPartitionName() + ", from: "
+        // + _message.getFromState() + ", to: " + _message.getToState());
+        return result;
+      }
 
-        result.getTaskResultMap().put("Message", messageId);
+      @Override
+      public void onError(Exception e, ErrorCode code, ErrorType type) {
+        // TODO Auto-generated method stub
+      }
+    }
+  }
+
+  public static class TestMessagingHandlerFactoryLatch implements MessageHandlerFactory {
+    public volatile CountDownLatch _latch = new CountDownLatch(1);
+    public int _messageCount = 0;
+    public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
+
+    @Override
+    public synchronized MessageHandler createHandler(Message message, NotificationContext context) {
+      _messageCount++;
+      return new TestMessagingHandlerLatch(message, context);
+    }
+
+    public synchronized void signal() {
+      _latch.countDown();
+      _latch = new CountDownLatch(1);
+    }
+
+    @Override
+    public String getMessageType() {
+      return "TestMessagingHandlerLatch";
+    }
+
+    @Override
+    public void reset() {
+      // TODO Auto-generated method stub
+    }
+
+    public class TestMessagingHandlerLatch extends MessageHandler {
+      public TestMessagingHandlerLatch(Message message, NotificationContext context) {
+        super(message, context);
+        // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public HelixTaskResult handleMessage() throws InterruptedException {
+        _latch.await();
+        HelixTaskResult result = new HelixTaskResult();
+        result.setSuccess(true);
+        result.getTaskResultMap().put("Message", _message.getMsgId());
+        String destName = _message.getTgtName();
         synchronized (_results) {
-          if (!_results.containsKey(partitionId)) {
-            _results.put(partitionId, new HashSet<String>());
+          if (!_results.containsKey(_message.getPartitionName())) {
+            _results.put(_message.getPartitionName(), new ConcurrentSkipListSet<String>());
           }
-          _results.get(partitionId).add(messageId);
         }
-        cnt++;
-        // System.err.println(cnt + ": message " + messageId + ", tgtName: " + tgtName
-        // + ", partitionId: " + partitionId);
+        _results.get(_message.getPartitionName()).add(destName);
+        // System.err.println("Message " + _message.getMsgId() + " executed");
         return result;
       }
 
@@ -141,28 +193,116 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   }
 
   @Test()
+  public void testSchedulerMsgUsingQueue() throws Exception {
+    Logger.getRootLogger().setLevel(Level.INFO);
+    _factory._results.clear();
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++) {
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
+          _factory.getMessageType(), _factory);
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage =
+        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+    schedulerMessage.getRecord().setSimpleField(
+        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
+    // Template for the individual message sent to each participant
+    Message msg = new Message(_factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = helixDataAccessor.keyBuilder();
+    helixDataAccessor.createProperty(keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
+        schedulerMessage);
+
+    for (int i = 0; i < 30; i++) {
+      Thread.sleep(2000);
+      if (_PARTITIONS == _factory._results.size()) {
+        break;
+      }
+    }
+
+    Assert.assertEquals(_PARTITIONS, _factory._results.size());
+    PropertyKey controllerTaskStatus =
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+            schedulerMessage.getMsgId());
+
+    int messageResultCount = 0;
+    for (int i = 0; i < 10; i++) {
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+      Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
+          .equals("" + (_PARTITIONS * 3)));
+      for (String key : statusUpdate.getMapFields().keySet()) {
+        if (key.startsWith("MessageResult ")) {
+          messageResultCount++;
+        }
+      }
+      if (messageResultCount == _PARTITIONS * 3) {
+        break;
+      } else {
+        Thread.sleep(2000);
+      }
+    }
+    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+    int count = 0;
+    for (Set<String> val : _factory._results.values()) {
+      count += val.size();
+    }
+    Assert.assertEquals(count, _PARTITIONS * 3);
+
+  }
+
+  @Test()
   public void testSchedulerMsg() throws Exception {
-    // Logger.getRootLogger().setLevel(Level.INFO);
+    Logger.getRootLogger().setLevel(Level.INFO);
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
 
     Message schedulerMessage =
-        new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
-    schedulerMessage.setTgtSessionId(SessionId.from("*"));
+        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
     schedulerMessage.setTgtName("CONTROLLER");
     // TODO: change it to "ADMIN" ?
     schedulerMessage.setSrcName("CONTROLLER");
     // schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE,
     // "TestSchedulerMsg");
     // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), MessageId.from("Template"));
-    msg.setTgtSessionId(SessionId.from("*"));
+    Message msg = new Message(_factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
     msg.setMsgState(MessageState.NEW);
 
     // Criteria to send individual messages
@@ -188,8 +328,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
 
     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
     Builder keyBuilder = helixDataAccessor.keyBuilder();
-    helixDataAccessor.createProperty(
-        keyBuilder.controllerMessage(schedulerMessage.getMessageId().stringify()), schedulerMessage);
+    helixDataAccessor.createProperty(keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
+        schedulerMessage);
 
     for (int i = 0; i < 30; i++) {
       Thread.sleep(2000);
@@ -200,8 +340,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
 
     Assert.assertEquals(_PARTITIONS, _factory._results.size());
     PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), schedulerMessage
-            .getMessageId().stringify());
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+            schedulerMessage.getMsgId());
 
     int messageResultCount = 0;
     for (int i = 0; i < 10; i++) {
@@ -231,11 +371,11 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     String controllerStatusPath =
         HelixUtil.getControllerPropertyPath(manager.getClusterName(),
             PropertyType.STATUSUPDATES_CONTROLLER);
-    List<String> subPaths = _zkClient.getChildren(controllerStatusPath);
+    List<String> subPaths = _gZkClient.getChildren(controllerStatusPath);
     Assert.assertTrue(subPaths.size() > 0);
     for (String subPath : subPaths) {
       String nextPath = controllerStatusPath + "/" + subPath;
-      List<String> subsubPaths = _zkClient.getChildren(nextPath);
+      List<String> subsubPaths = _gZkClient.getChildren(nextPath);
       Assert.assertTrue(subsubPaths.size() > 0);
     }
 
@@ -243,38 +383,38 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
         HelixUtil.getInstancePropertyPath(manager.getClusterName(), "localhost_" + (START_PORT),
             PropertyType.STATUSUPDATES);
 
-    subPaths = _zkClient.getChildren(instanceStatusPath);
+    subPaths = _gZkClient.getChildren(instanceStatusPath);
     Assert.assertTrue(subPaths.size() > 0);
     for (String subPath : subPaths) {
       String nextPath = instanceStatusPath + "/" + subPath;
-      List<String> subsubPaths = _zkClient.getChildren(nextPath);
+      List<String> subsubPaths = _gZkClient.getChildren(nextPath);
       Assert.assertTrue(subsubPaths.size() > 0);
       for (String subsubPath : subsubPaths) {
         String nextnextPath = nextPath + "/" + subsubPath;
-        Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0);
+        Assert.assertTrue(_gZkClient.getChildren(nextnextPath).size() > 0);
       }
     }
     Thread.sleep(3000);
-    ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0);
+    ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _gZkClient, 0);
     dumpTask.run();
 
-    subPaths = _zkClient.getChildren(controllerStatusPath);
+    subPaths = _gZkClient.getChildren(controllerStatusPath);
     Assert.assertTrue(subPaths.size() > 0);
     for (String subPath : subPaths) {
       String nextPath = controllerStatusPath + "/" + subPath;
-      List<String> subsubPaths = _zkClient.getChildren(nextPath);
+      List<String> subsubPaths = _gZkClient.getChildren(nextPath);
       Assert.assertTrue(subsubPaths.size() == 0);
     }
 
-    subPaths = _zkClient.getChildren(instanceStatusPath);
+    subPaths = _gZkClient.getChildren(instanceStatusPath);
     Assert.assertTrue(subPaths.size() > 0);
     for (String subPath : subPaths) {
       String nextPath = instanceStatusPath + "/" + subPath;
-      List<String> subsubPaths = _zkClient.getChildren(nextPath);
+      List<String> subsubPaths = _gZkClient.getChildren(nextPath);
       Assert.assertTrue(subsubPaths.size() > 0);
       for (String subsubPath : subsubPaths) {
         String nextnextPath = nextPath + "/" + subsubPath;
-        Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() == 0);
+        Assert.assertTrue(_gZkClient.getChildren(nextnextPath).size() == 0);
       }
     }
   }
@@ -284,22 +424,22 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
 
     Message schedulerMessage =
-        new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
-    schedulerMessage.setTgtSessionId(SessionId.from("*"));
+        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
     schedulerMessage.setTgtName("CONTROLLER");
     // TODO: change it to "ADMIN" ?
     schedulerMessage.setSrcName("CONTROLLER");
 
     // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), MessageId.from("Template"));
-    msg.setTgtSessionId(SessionId.from("*"));
+    Message msg = new Message(_factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
     msg.setMsgState(MessageState.NEW);
 
     // Criteria to send individual messages
@@ -375,22 +515,22 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           factory.getMessageType(), factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
 
     Message schedulerMessage =
-        new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
-    schedulerMessage.setTgtSessionId(SessionId.from("*"));
+        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
     schedulerMessage.setTgtName("CONTROLLER");
     // TODO: change it to "ADMIN" ?
     schedulerMessage.setSrcName("CONTROLLER");
 
     // Template for the individual message sent to each participant
-    Message msg = new Message(factory.getMessageType(), MessageId.from("Template"));
-    msg.setTgtSessionId(SessionId.from("*"));
+    Message msg = new Message(factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
     msg.setMsgState(MessageState.NEW);
 
     // Criteria to send individual messages
@@ -416,21 +556,21 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
 
     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
     Builder keyBuilder = helixDataAccessor.keyBuilder();
-    PropertyKey controllerMessageKey =
-        keyBuilder.controllerMessage(schedulerMessage.getMessageId().stringify());
+    PropertyKey controllerMessageKey = keyBuilder.controllerMessage(schedulerMessage.getMsgId());
     helixDataAccessor.setProperty(controllerMessageKey, schedulerMessage);
 
     Thread.sleep(3000);
 
     Assert.assertEquals(0, factory._results.size());
-
-    waitMessageUpdate("SentMessageCount", schedulerMessage.getMessageId().stringify(),
-        helixDataAccessor);
     PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), schedulerMessage
-            .getMessageId().stringify());
-    waitMessageUpdate("SentMessageCount", schedulerMessage.getMessageId().stringify(),
-        helixDataAccessor);
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+            schedulerMessage.getMsgId());
+    for (int i = 0; i < 10; i++) {
+      StatusUpdate update = helixDataAccessor.getProperty(controllerTaskStatus);
+      if (update == null || update.getRecord().getMapField("SentMessageCount") == null) {
+        Thread.sleep(1000);
+      }
+    }
     ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
     Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount").equals("0"));
     int count = 0;
@@ -442,30 +582,28 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
 
   @Test()
   public void testSchedulerMsg3() throws Exception {
-    final int avgReplicas = _PARTITIONS * 3 / 5;
-
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
 
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-          _factory.getMessageType(), _factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
+      // _participants[i].getMessagingService().registerMessageHandlerFactory(
+      // _factory.getMessageType(), _factory);
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
 
     Message schedulerMessage =
-        new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
-    schedulerMessage.setTgtSessionId(SessionId.from("*"));
+        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
     schedulerMessage.setTgtName("CONTROLLER");
     // TODO: change it to "ADMIN" ?
     schedulerMessage.setSrcName("CONTROLLER");
 
     // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), MessageId.from("Template"));
-    msg.setTgtSessionId(SessionId.from("*"));
+    Message msg = new Message(_factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
     msg.setMsgState(MessageState.NEW);
 
     // Criteria to send individual messages
@@ -518,7 +656,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
 
       sw = new StringWriter();
       mapper.writeValue(sw, cr);
-      schedulerMessage.setMessageId(MessageId.from(UUID.randomUUID().toString()));
+      schedulerMessage.setMsgId(UUID.randomUUID().toString());
       crString = sw.toString();
       schedulerMessage.getRecord().setSimpleField("Criteria", crString);
       manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
@@ -526,35 +664,59 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
           callback._message.getResultMap().get(
               DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
 
-      Thread.sleep(1000);
       HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
       Builder keyBuilder = helixDataAccessor.keyBuilder();
 
-      waitMessageUpdate("Summary", msgId, helixDataAccessor);
+      for (int j = 0; j < 100; j++) {
+        Thread.sleep(200);
+        PropertyKey controllerTaskStatus =
+            keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+        ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+        if (statusUpdate.getMapFields().containsKey("Summary")) {
+          break;
+        }
+      }
 
       PropertyKey controllerTaskStatus =
           keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
       ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
       Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
-          .equals("" + avgReplicas));
+          .equals("" + (_PARTITIONS * 3 / 5)));
       int messageResultCount = 0;
       for (String key : statusUpdate.getMapFields().keySet()) {
         if (key.startsWith("MessageResult")) {
           messageResultCount++;
         }
       }
-      Assert.assertEquals(messageResultCount, avgReplicas);
+      Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
 
-      int count = 0;
-      // System.out.println(i);
-      for (Set<String> val : _factory._results.values()) {
-        // System.out.println(val);
-        count += val.size();
+
+      boolean success = false;
+      for (int j = 0; j < 6; j++) {
+        int count = 0;
+        // System.out.println(i);
+        for (Set<String> val : _factory._results.values()) {
+          // System.out.println(val);
+          count += val.size();
+        }
+        // System.out.println(count);
+        // Assert.assertEquals(count, _PARTITIONS * 3 / 5 * (i + 1));
+        success = count == _PARTITIONS * 3 / 5 * (i + 1);
+        if (success) {
+          break;
+        }
+        Thread.sleep(500);
       }
-      // System.out.println(count);
+      Assert.assertTrue(success);
+    }
+  }
 
-      Assert.assertEquals(count, avgReplicas * (i + 1));
+  private int count(TestMessagingHandlerFactory factory) {
+    int cnt = 0;
+    for (Set<String> val : factory._results.values()) {
+      cnt += val.size();
     }
+    return cnt;
   }
 
   @Test()
@@ -562,25 +724,26 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
           _factory.getMessageType(), _factory);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-          _factory.getMessageType(), _factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
+
+      // _participants[i].getMessagingService().registerMessageHandlerFactory(
+      // _factory.getMessageType(), _factory);
+
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
     }
 
     Message schedulerMessage =
-        new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
-    schedulerMessage.setTgtSessionId(SessionId.from("*"));
+        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
     schedulerMessage.setTgtName("CONTROLLER");
     // TODO: change it to "ADMIN" ?
     schedulerMessage.setSrcName("CONTROLLER");
 
     // Template for the individual message sent to each participant
-    Message msg = new Message(_factory.getMessageType(), MessageId.from("Template"));
-    msg.setTgtSessionId(SessionId.from("*"));
-    msg.setMsgState(MessageState.NEW);
+    Message msgTemplate = new Message(_factory.getMessageType(), "Template");
+    msgTemplate.setTgtSessionId("*");
+    msgTemplate.setMsgState(MessageState.NEW);
 
     // Criteria to send individual messages
     Criteria cr = new Criteria();
@@ -600,7 +763,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     String crString = sw.toString();
 
     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setMapField("MessageTemplate", msgTemplate.getRecord().getSimpleFields());
     schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
     schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
 
@@ -635,8 +798,17 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
         callback._message.getResultMap()
             .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
 
-    final HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    final Builder keyBuilder = helixDataAccessor.keyBuilder();
+    boolean success = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        return count(_factory) == 60; // TestDB number_of_partitions x replicas
+      }
+    }, 10 * 1000);
+    Assert.assertTrue(success, "If not specifying participant, controller will send 60 messages");
+
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = helixDataAccessor.keyBuilder();
     ArrayList<String> msgIds = new ArrayList<String>();
     for (int i = 0; i < NODE_NR; i++) {
       callback = new MockAsyncCallback();
@@ -647,25 +819,34 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
 
       sw = new StringWriter();
       mapper.writeValue(sw, cr);
-      schedulerMessage.setMessageId(MessageId.from(UUID.randomUUID().toString()));
-
-      // need to use a different name for scheduler_task_queue task resource
-      schedulerMessage.getRecord().setSimpleField(
-          DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg4_" + i);
-
+      schedulerMessage.setMsgId(UUID.randomUUID().toString());
       crString = sw.toString();
       schedulerMessage.getRecord().setSimpleField("Criteria", crString);
       manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+
+      Thread.sleep(5000);
+      System.err.println("count: " + count(_factory));
+
       String msgId =
           callback._message.getResultMap().get(
               DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
       msgIds.add(msgId);
     }
 
-    for (int i = 0; i < NODE_NR; i++) {
-      final String msgId = msgIds.get(i);
+    // System.err.println("count: " + count(_factory));
 
-      waitMessageUpdate("Summary", msgId, helixDataAccessor);
+    for (int i = 0; i < NODE_NR; i++) {
+      String msgId = msgIds.get(i);
+      for (int j = 0; j < 100; j++) {
+        Thread.sleep(200);
+        PropertyKey controllerTaskStatus =
+            keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+        ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+        if (statusUpdate.getMapFields().containsKey("Summary")) {
+          // System.err.println(msgId+" done");
+          break;
+        }
+      }
 
       PropertyKey controllerTaskStatus =
           keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
@@ -678,11 +859,24 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
           messageResultCount++;
         }
       }
+      if (messageResultCount != _PARTITIONS * 3 / 5) {
+        int x = 10;
+        x = x + messageResultCount;
+      }
       Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
     }
 
-    waitMessageUpdate("Summary", msgIdPrime, helixDataAccessor);
+    for (int j = 0; j < 100; j++) {
+      Thread.sleep(200);
+      PropertyKey controllerTaskStatus =
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgIdPrime);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+      if (statusUpdate.getMapFields().containsKey("Summary")) {
+        break;
+      }
+    }
 
+    // Thread.sleep(5000);
     int count = 0;
     for (Set<String> val : _factory._results.values()) {
       // System.out.println(val);
@@ -692,31 +886,147 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     Assert.assertEquals(count, _PARTITIONS * 3 * 2);
   }
 
-  /**
-   * wait message summary to appear in controller-message-status-update
-   * @param msgId
-   * @param accessor
-   * @return
-   * @throws Exception
-   */
-  private boolean waitMessageUpdate(final String mapKey, final String msgId,
-      final HelixDataAccessor accessor) throws Exception {
-    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    return TestHelper.verify(new TestHelper.Verifier() {
+  @Test
+  public void testSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException,
+      IOException, InterruptedException {
+    TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++) {
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
+          factory.getMessageType(), factory);
 
-      @Override
-      public boolean verify() throws Exception {
-        PropertyKey key = keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(), msgId);
-        HelixProperty statusUpdate = accessor.getProperty(key);
-        if (statusUpdate == null) {
-          return false;
-        }
+      _participants[i].getMessagingService().registerMessageHandlerFactory(
+          factory.getMessageType(), factory);
 
-        if (statusUpdate.getRecord().getMapField(mapKey) == null) {
-          return false;
-        }
-        return true;
+      manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage =
+        new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+
+    // Template for the individual message sent to each participant
+    Message msg = new Message(factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+    schedulerMessage.getRecord().setSimpleField(
+        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
+
+    Criteria cr2 = new Criteria();
+    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+    cr2.setInstanceName("*");
+    cr2.setSessionSpecific(false);
+
+    MockAsyncCallback callback = new MockAsyncCallback();
+    mapper = new ObjectMapper();
+    serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = helixDataAccessor.keyBuilder();
+
+    // Set contraints that only 1 msg per participant
+    Map<String, String> constraints = new TreeMap<String, String>();
+    constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
+    constraints.put("TRANSITION", "OFFLINE-COMPLETED");
+    constraints.put("CONSTRAINT_VALUE", "1");
+    constraints.put("INSTANCE", ".*");
+    manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
+        ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(constraints));
+
+    // Send scheduler message
+    crString = sw.toString();
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+    String msgId =
+        callback._message.getResultMap()
+            .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+
+    for (int j = 0; j < 10; j++) {
+      Thread.sleep(200);
+      PropertyKey controllerTaskStatus =
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+      if (statusUpdate.getMapFields().containsKey("SentMessageCount")) {
+        Assert.assertEquals(
+            statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""
+                + (_PARTITIONS * 3));
+        break;
       }
-    }, 20 * 1000);
+    }
+
+    for (int i = 0; i < _PARTITIONS * 3 / 5; i++) {
+      for (int j = 0; j < 10; j++) {
+        Thread.sleep(300);
+        if (factory._messageCount == 5 * (i + 1))
+          break;
+      }
+      Thread.sleep(300);
+      Assert.assertEquals(factory._messageCount, 5 * (i + 1));
+      factory.signal();
+      // System.err.println(i);
+    }
+
+    for (int j = 0; j < 10; j++) {
+      Thread.sleep(200);
+      PropertyKey controllerTaskStatus =
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+      if (statusUpdate.getMapFields().containsKey("Summary")) {
+        break;
+      }
+    }
+
+    Assert.assertEquals(_PARTITIONS, factory._results.size());
+    PropertyKey controllerTaskStatus =
+        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
+    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
+    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
+        .equals("" + (_PARTITIONS * 3)));
+    int messageResultCount = 0;
+    for (String key : statusUpdate.getMapFields().keySet()) {
+      if (key.startsWith("MessageResult ")) {
+        messageResultCount++;
+      }
+    }
+    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+
+    int count = 0;
+    for (Set<String> val : factory._results.values()) {
+      count += val.size();
+    }
+    Assert.assertEquals(count, _PARTITIONS * 3);
+
+    manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(),
+        ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
+
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
deleted file mode 100644
index b887fe7..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java
+++ /dev/null
@@ -1,254 +0,0 @@
-package org.apache.helix.integration;
-
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.helix.Criteria;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.messaging.AsyncCallback;
-import org.apache.helix.messaging.handling.HelixTaskResult;
-import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.ConstraintItem;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestSchedulerMsgContraints extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
-
-  class MockAsyncCallback extends AsyncCallback {
-    Message _message;
-
-    public MockAsyncCallback() {
-    }
-
-    @Override
-    public void onTimeOut() {
-      // TODO Auto-generated method stub
-    }
-
-    @Override
-    public void onReplyMessage(Message message) {
-      _message = message;
-    }
-  }
-
-  public static class TestMessagingHandlerFactoryLatch implements MessageHandlerFactory {
-    public volatile CountDownLatch _latch = new CountDownLatch(1);
-    public int _messageCount = 0;
-    public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
-
-    @Override
-    public synchronized MessageHandler createHandler(Message message, NotificationContext context) {
-      _messageCount++;
-      return new TestMessagingHandlerLatch(message, context);
-    }
-
-    public synchronized void signal() {
-      _latch.countDown();
-      _latch = new CountDownLatch(1);
-    }
-
-    @Override
-    public String getMessageType() {
-      return "TestMessagingHandlerLatch";
-    }
-
-    @Override
-    public void reset() {
-      // TODO Auto-generated method stub
-    }
-
-    public class TestMessagingHandlerLatch extends MessageHandler {
-      public TestMessagingHandlerLatch(Message message, NotificationContext context) {
-        super(message, context);
-        // TODO Auto-generated constructor stub
-      }
-
-      @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
-        _latch.await();
-        HelixTaskResult result = new HelixTaskResult();
-        result.setSuccess(true);
-        result.getTaskResultMap().put("Message", _message.getMessageId().stringify());
-        String destName = _message.getTgtName();
-        synchronized (_results) {
-          if (!_results.containsKey(_message.getPartitionId().stringify())) {
-            _results
-                .put(_message.getPartitionId().stringify(), new ConcurrentSkipListSet<String>());
-          }
-        }
-        _results.get(_message.getPartitionId().stringify()).add(destName);
-        // System.err.println("Message " + _message.getMsgId() + " executed");
-        return result;
-      }
-
-      @Override
-      public void onError(Exception e, ErrorCode code, ErrorType type) {
-        // TODO Auto-generated method stub
-      }
-    }
-  }
-
-  @Test
-  public void testSchedulerMsgContraints() throws Exception {
-    TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
-    HelixManager manager = null;
-    for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-          factory.getMessageType(), factory);
-      //
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-          factory.getMessageType(), factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
-    }
-
-    Message schedulerMessage =
-        new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
-    schedulerMessage.setTgtSessionId(SessionId.from("*"));
-    schedulerMessage.setTgtName("CONTROLLER");
-    // TODO: change it to "ADMIN" ?
-    schedulerMessage.setSrcName("CONTROLLER");
-
-    // Template for the individual message sent to each participant
-    Message msg = new Message(factory.getMessageType(), MessageId.from("Template"));
-    msg.setTgtSessionId(SessionId.from("*"));
-    msg.setMsgState(MessageState.NEW);
-
-    // Criteria to send individual messages
-    Criteria cr = new Criteria();
-    cr.setInstanceName("localhost_%");
-    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    cr.setSessionSpecific(false);
-    cr.setResource("%");
-    cr.setPartition("%");
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    StringWriter sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    String crString = sw.toString();
-
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
-    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
-    schedulerMessage.getRecord().setSimpleField(
-        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
-
-    Criteria cr2 = new Criteria();
-    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
-    cr2.setInstanceName("*");
-    cr2.setSessionSpecific(false);
-
-    MockAsyncCallback callback = new MockAsyncCallback();
-    mapper = new ObjectMapper();
-    serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
-
-    // Set contraints that only 1 msg per participant
-    Map<String, String> constraints = new TreeMap<String, String>();
-    constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
-    constraints.put("TRANSITION", "OFFLINE-COMPLETED");
-    constraints.put("CONSTRAINT_VALUE", "1");
-    constraints.put("INSTANCE", ".*");
-    manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
-        ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(constraints));
-
-    // Send scheduler message
-    crString = sw.toString();
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
-    String msgId =
-        callback._message.getResultMap()
-            .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
-
-    for (int j = 0; j < 10; j++) {
-      Thread.sleep(200);
-      PropertyKey controllerTaskStatus =
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-      if (statusUpdate.getMapFields().containsKey("SentMessageCount")) {
-        Assert.assertEquals(
-            statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""
-                + (_PARTITIONS * 3));
-        break;
-      }
-    }
-
-    for (int i = 0; i < _PARTITIONS * 3 / 5; i++) {
-      for (int j = 0; j < 10; j++) {
-        Thread.sleep(300);
-        if (factory._messageCount == 5 * (i + 1))
-          break;
-      }
-      Thread.sleep(300);
-      Assert.assertEquals(factory._messageCount, 5 * (i + 1));
-      factory.signal();
-      // System.err.println(i);
-    }
-
-    for (int j = 0; j < 10; j++) {
-      Thread.sleep(200);
-      PropertyKey controllerTaskStatus =
-          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-      if (statusUpdate.getMapFields().containsKey("Summary")) {
-        break;
-      }
-    }
-
-    Assert.assertEquals(_PARTITIONS, factory._results.size());
-    PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId);
-    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
-        .equals("" + (_PARTITIONS * 3)));
-    int messageResultCount = 0;
-    for (String key : statusUpdate.getMapFields().keySet()) {
-      if (key.startsWith("MessageResult ")) {
-        messageResultCount++;
-      }
-    }
-    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-
-    int count = 0;
-    for (Set<String> val : factory._results.values()) {
-      count += val.size();
-    }
-    Assert.assertEquals(count, _PARTITIONS * 3);
-
-    manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(),
-        ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
deleted file mode 100644
index bbaa18d..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package org.apache.helix.integration;
-
-import java.io.StringWriter;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.helix.Criteria;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.SessionId;
-import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.messaging.handling.HelixTaskResult;
-import org.apache.helix.messaging.handling.MessageHandler;
-import org.apache.helix.messaging.handling.MessageHandlerFactory;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.helix.model.Message.MessageType;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestSchedulerMsgUsingQueue extends ZkStandAloneCMTestBaseWithPropertyServerCheck {
-  public static class TestMessagingHandlerFactory implements MessageHandlerFactory {
-    int cnt;
-    public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
-
-    public TestMessagingHandlerFactory() {
-      super();
-      cnt = 0;
-    }
-
-    @Override
-    public MessageHandler createHandler(Message message, NotificationContext context) {
-      // System.out.println("\t create-hdlr: " + message.getId());
-      return new TestMessagingHandler(message, context);
-    }
-
-    @Override
-    public String getMessageType() {
-      return "TestParticipant";
-    }
-
-    @Override
-    public void reset() {
-      // TODO Auto-generated method stub
-
-    }
-
-    public class TestMessagingHandler extends MessageHandler {
-      public TestMessagingHandler(Message message, NotificationContext context) {
-        super(message, context);
-        // TODO Auto-generated constructor stub
-      }
-
-      @Override
-      public HelixTaskResult handleMessage() throws InterruptedException {
-        HelixTaskResult result = new HelixTaskResult();
-        result.setSuccess(true);
-        // String tgtName = _message.getTgtName();
-        String messageId = _message.getMessageId().stringify();
-        String partitionId = _message.getPartitionId().stringify();
-
-        result.getTaskResultMap().put("Message", messageId);
-        synchronized (_results) {
-          if (!_results.containsKey(partitionId)) {
-            _results.put(partitionId, new HashSet<String>());
-          }
-          _results.get(partitionId).add(messageId);
-        }
-        cnt++;
-        // System.err.println(cnt + ": message " + messageId + ", tgtName: " + tgtName
-        // + ", partitionId: " + partitionId);
-        return result;
-      }
-
-      @Override
-      public void onError(Exception e, ErrorCode code, ErrorType type) {
-        // TODO Auto-generated method stub
-      }
-    }
-  }
-
-  @Test()
-  public void testSchedulerMsgUsingQueue() throws Exception {
-    // Logger.getRootLogger().setLevel(Level.INFO);
-    // _factory._results.clear();
-    TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
-
-    HelixManager manager = null;
-    for (int i = 0; i < NODE_NR; i++) {
-      String hostDest = "localhost_" + (START_PORT + i);
-      _startCMResultMap.get(hostDest)._manager.getMessagingService().registerMessageHandlerFactory(
-          factory.getMessageType(), factory);
-      manager = _startCMResultMap.get(hostDest)._manager;
-    }
-
-    Message schedulerMessage =
-        new Message(MessageType.SCHEDULER_MSG + "", MessageId.from(UUID.randomUUID().toString()));
-    schedulerMessage.setTgtSessionId(SessionId.from("*"));
-    schedulerMessage.setTgtName("CONTROLLER");
-    // TODO: change it to "ADMIN" ?
-    schedulerMessage.setSrcName("CONTROLLER");
-    schedulerMessage.getRecord().setSimpleField(
-        DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgUsingQueue");
-    // Template for the individual message sent to each participant
-    Message msg = new Message(factory.getMessageType(), MessageId.from("Template"));
-    msg.setTgtSessionId(SessionId.from("*"));
-    msg.setMsgState(MessageState.NEW);
-
-    // Criteria to send individual messages
-    Criteria cr = new Criteria();
-    cr.setInstanceName("localhost_%");
-    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    cr.setSessionSpecific(false);
-    cr.setResource("%");
-    cr.setPartition("%");
-
-    ObjectMapper mapper = new ObjectMapper();
-    SerializationConfig serializationConfig = mapper.getSerializationConfig();
-    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-    StringWriter sw = new StringWriter();
-    mapper.writeValue(sw, cr);
-
-    String crString = sw.toString();
-
-    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields());
-    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
-
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
-    helixDataAccessor.createProperty(
-        keyBuilder.controllerMessage(schedulerMessage.getMessageId().stringify()), schedulerMessage);
-
-    for (int i = 0; i < 30; i++) {
-      Thread.sleep(2000);
-      if (_PARTITIONS == factory._results.size()) {
-        break;
-      }
-    }
-
-    Assert.assertEquals(_PARTITIONS, factory._results.size());
-    PropertyKey controllerTaskStatus =
-        keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), schedulerMessage
-            .getMessageId().stringify());
-
-    int messageResultCount = 0;
-    for (int i = 0; i < 10; i++) {
-      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
-      Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
-          .equals("" + (_PARTITIONS * 3)));
-      for (String key : statusUpdate.getMapFields().keySet()) {
-        if (key.startsWith("MessageResult ")) {
-          messageResultCount++;
-        }
-      }
-      if (messageResultCount == _PARTITIONS * 3) {
-        break;
-      } else {
-        Thread.sleep(2000);
-      }
-    }
-    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
-    int count = 0;
-    for (Set<String> val : factory._results.values()) {
-      count += val.size();
-    }
-    Assert.assertEquals(count, _PARTITIONS * 3);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
index 3024f45..a927520 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchemataSM.java
@@ -27,10 +27,10 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -46,7 +46,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
     String clusterName = className + "_" + methodName;
     int n = 5;
 
-    MockParticipant[] participants = new MockParticipant[n];
+    MockParticipantManager[] participants = new MockParticipantManager[n];
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
@@ -70,14 +70,15 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
         Arrays.asList(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString()));
     accessor.setProperty(key, idealState);
 
-    ClusterController controller = new ClusterController(clusterName, "controller", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
     controller.syncStart();
 
     // start n-1 participants
     for (int i = 1; i < n; i++) {
       String instanceName = "localhost_" + (12918 + i);
 
-      participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
@@ -87,7 +88,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // start the remaining 1 participant
-    participants[0] = new MockParticipant(clusterName, "localhost_12918", ZK_ADDR, null);
+    participants[0] = new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
     participants[0].syncStart();
 
     // make sure we have all participants in MASTER state
@@ -107,6 +108,7 @@ public class TestSchemataSM extends ZkIntegrationTestBase {
     }
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
index 1b69572..4abb519 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
@@ -22,31 +22,29 @@ package org.apache.helix.integration;
 import java.util.Date;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.mock.controller.ClusterController;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
+  private static Logger LOG = Logger.getLogger(TestSessionExpiryInTransition.class);
 
   public class SessionExpiryTransition extends MockTransition {
     private final AtomicBoolean _done = new AtomicBoolean();
 
     @Override
     public void doTransition(Message message, NotificationContext context) {
-      ZkHelixTestManager manager = (ZkHelixTestManager) context.getManager();
+      MockParticipantManager manager = (MockParticipantManager) context.getManager();
 
       String instance = message.getTgtName();
       PartitionId partition = message.getPartitionId();
@@ -57,8 +55,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
         try {
           ZkTestHelper.expireSession(manager.getZkClient());
         } catch (Exception e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
+          LOG.error("Exception expire zk-session", e);
         }
       }
     }
@@ -66,7 +63,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
 
   @Test
   public void testSessionExpiryInTransition() throws Exception {
-    Logger.getRootLogger().setLevel(Level.WARN);
+    // Logger.getRootLogger().setLevel(Level.WARN);
 
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
@@ -74,7 +71,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
 
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
@@ -86,15 +83,15 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
         "MasterSlave", true); // do rebalance
 
     // start controller
-    ClusterController controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
     controller.syncStart();
 
     // start participants
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      ZkHelixTestManager manager =
-          new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
-      participants[i] = new MockParticipant(manager, new SessionExpiryTransition());
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].setTransition(new SessionExpiryTransition());
       participants[i].syncStart();
     }
 
@@ -104,13 +101,11 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // clean up
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }
 
-    Thread.sleep(2000);
-    controller.syncStop();
-
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
index 02a34d8..6eb7a8c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMMain.java
@@ -21,9 +21,15 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.TestHelper.Verifier;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
@@ -35,16 +41,34 @@ public class TestStandAloneCMMain extends ZkStandAloneCMTestBase {
   @Test()
   public void testStandAloneCMMain() throws Exception {
     logger.info("RUN testStandAloneCMMain() at " + new Date(System.currentTimeMillis()));
-
+    ClusterControllerManager newController = null;
     for (int i = 1; i <= 2; i++) {
       String controllerName = "controller_" + i;
-      StartCMResult startResult =
-          TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
-              HelixControllerMain.STANDALONE);
-      _startCMResultMap.put(controllerName, startResult);
+      newController =
+          new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+      newController.syncStart();
     }
 
-    stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap);
+    // stopCurrentLeader(_zkClient, CLUSTER_NAME, _startCMResultMap);
+    _controller.syncStop();
+
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    final String newControllerName = newController.getInstanceName();
+    TestHelper.verify(new Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+        if (leader == null) {
+          return false;
+        }
+        return leader.getInstanceName().equals(newControllerName);
+
+      }
+    }, 30 * 1000);
+
     boolean result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
             ZK_ADDR, CLUSTER_NAME));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
index 81c08ed..dad998d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStandAloneCMSessionExpiry.java
@@ -21,11 +21,10 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
-import org.apache.helix.InstanceType;
 import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
@@ -47,18 +46,16 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase {
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, PARTICIPANT_PREFIX, "TestDB", 1, 20, 5, 3,
         "MasterSlave", true);
 
-    MockParticipant[] participants = new MockParticipant[5];
+    MockParticipantManager[] participants = new MockParticipantManager[5];
     for (int i = 0; i < 5; i++) {
       String instanceName = "localhost_" + (12918 + i);
-      ZkHelixTestManager manager =
-          new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
-      participants[i] = new MockParticipant(manager, null);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
       participants[i].syncStart();
     }
 
-    ZkHelixTestManager controller =
-        new ZkHelixTestManager(clusterName, "controller_0", InstanceType.CONTROLLER, ZK_ADDR);
-    controller.connect();
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
 
     boolean result;
     result =
@@ -67,7 +64,7 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // participant session expiry
-    ZkHelixTestManager participantToExpire = participants[1].getManager();
+    MockParticipantManager participantToExpire = participants[1];
 
     System.out.println("Expire participant session");
     String oldSessionId = participantToExpire.getSessionId();
@@ -107,8 +104,7 @@ public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase {
     // clean up
     System.out.println("Clean up ...");
     // Logger.getRootLogger().setLevel(Level.DEBUG);
-    controller.disconnect();
-    Thread.sleep(100);
+    controller.syncStop();
     for (int i = 0; i < 5; i++) {
       participants[i].syncStop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
index dce3fd4..d191c18 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStartMultipleControllersWithSameName.java
@@ -25,7 +25,7 @@ import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
-import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -54,10 +54,10 @@ public class TestStartMultipleControllersWithSameName extends ZkIntegrationTestB
     // rebalance
 
     // start controller
-    ClusterController[] controllers = new ClusterController[4];
+    ClusterControllerManager[] controllers = new ClusterControllerManager[4];
     for (int i = 0; i < 4; i++) {
-      controllers[i] = new ClusterController(clusterName, "controller_0", ZK_ADDR);
-      controllers[i].start();
+      controllers[i] = new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+      controllers[i].syncStart();
     }
 
     Thread.sleep(500); // wait leader election finishes
@@ -69,7 +69,6 @@ public class TestStartMultipleControllersWithSameName extends ZkIntegrationTestB
     // clean up
     for (int i = 0; i < 4; i++) {
       controllers[i].syncStop();
-      Thread.sleep(1000); // wait for all zk callbacks done
     }
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index aff40b3..318177a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -29,17 +29,13 @@ import java.util.Set;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.controller.HelixControllerMain;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
 import org.apache.helix.mock.participant.MockMSStateModel;
-import org.apache.helix.mock.participant.MockParticipant;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.mock.participant.SleepTransition;
 import org.apache.helix.model.ExternalView;
@@ -60,15 +56,14 @@ import org.testng.annotations.Test;
 public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
   private static Logger LOG = Logger.getLogger(TestStateTransitionTimeout.class);
 
+  @Override
   @BeforeClass
   public void beforeClass() throws Exception {
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
     String namespace = "/" + CLUSTER_NAME;
-    if (_zkClient.exists(namespace)) {
-      _zkClient.deleteRecursive(namespace);
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
     }
     _setupTool = new ClusterSetup(ZK_ADDR);
 
@@ -107,12 +102,14 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
       _sleep = sleep;
     }
 
+    @Override
     @Transition(to = "SLAVE", from = "OFFLINE")
     public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
       LOG.info("Become SLAVE from OFFLINE");
 
     }
 
+    @Override
     @Transition(to = "MASTER", from = "SLAVE")
     public void onBecomeMasterFromSlave(Message message, NotificationContext context)
         throws InterruptedException {
@@ -122,23 +119,27 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
       }
     }
 
+    @Override
     @Transition(to = "SLAVE", from = "MASTER")
     public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
       LOG.info("Become SLAVE from MASTER");
     }
 
+    @Override
     @Transition(to = "OFFLINE", from = "SLAVE")
     public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
       LOG.info("Become OFFLINE from SLAVE");
 
     }
 
+    @Override
     @Transition(to = "DROPPED", from = "OFFLINE")
     public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
       LOG.info("Become DROPPED from OFFLINE");
 
     }
 
+    @Override
     public void rollbackOnError(Message message, NotificationContext context,
         StateTransitionError error) {
       _error = error;
@@ -172,7 +173,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
   @Test
   public void testStateTransitionTimeOut() throws Exception {
     Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>();
-    MockParticipant[] participants = new MockParticipant[NODE_NR];
+    // MockParticipantManager[] participants = new MockParticipantManager[NODE_NR];
     IdealState idealState =
         _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
     for (int i = 0; i < NODE_NR; i++) {
@@ -185,19 +186,20 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
         }
       }
 
-      participants[i] = new MockParticipant(factory, CLUSTER_NAME, instanceName, ZK_ADDR, null);
-      participants[i].syncStart();
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", factory);
+      _participants[i].syncStart();
     }
     String controllerName = CONTROLLER_PREFIX + "_0";
-    StartCMResult startResult =
-        TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
-            HelixControllerMain.STANDALONE);
-    _startCMResultMap.put(controllerName, startResult);
+    _controller =
+        new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
     boolean result =
         ClusterStateVerifier
             .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
     Assert.assertTrue(result);
-    HelixDataAccessor accessor = participants[0].getManager().getHelixDataAccessor();
+    HelixDataAccessor accessor = _participants[0].getHelixDataAccessor();
 
     Builder kb = accessor.keyBuilder();
     ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
index b2a5719..6cce716 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
@@ -21,9 +21,8 @@ package org.apache.helix.integration;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
-import org.apache.helix.TestHelper.StartCMResult;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -33,8 +32,7 @@ import org.testng.annotations.Test;
 public class TestSwapInstance extends ZkStandAloneCMTestBase {
   @Test
   public void TestSwap() throws Exception {
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    HelixManager manager = _startCMResultMap.get(controllerName)._manager;
+    HelixManager manager = _controller;
     HelixDataAccessor helixAccessor = manager.getHelixDataAccessor();
     _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 64, STATE_MODEL);
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", _replica);
@@ -49,7 +47,7 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
     idealStateOld2.merge(is2.getRecord());
 
     String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    ZKHelixAdmin tool = new ZKHelixAdmin(_zkClient);
+    ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
     _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
 
     boolean result =
@@ -68,8 +66,7 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
     }
     Assert.assertTrue(exception);
 
-    _startCMResultMap.get(instanceName)._manager.disconnect();
-    _startCMResultMap.get(instanceName)._thread.interrupt();
+    _participants[0].syncStop();
     Thread.sleep(1000);
 
     exception = false;
@@ -80,8 +77,9 @@ public class TestSwapInstance extends ZkStandAloneCMTestBase {
       exception = true;
     }
     Assert.assertFalse(exception);
-    StartCMResult result2 = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName2);
-    _startCMResultMap.put(instanceName2, result2);
+    MockParticipantManager newParticipant =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName2);
+    newParticipant.syncStart();
 
     result =
         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(