You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by sl...@apache.org on 2013/02/15 23:20:13 UTC

[1/2] git commit: Helix task queue work. scheduler message that are sent to controller are put in a taskqueue resource groups and actual tasks are done in a SchedulerTask statemodel. Message handling constraints can be done at resource level, cluster lev

Helix task queue work. scheduler message that are sent to controller are
put in a taskqueue resource groups and actual tasks are done in a
SchedulerTask statemodel. Message handling constraints can be done at
resource level, cluster level, against the OFFLINE-COMPLETED state
transition type.

Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/5e8a912b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/5e8a912b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/5e8a912b

Branch: refs/heads/master
Commit: 5e8a912bd4f4c83ad84cb8f92d3f4acca56972cf
Parents: 88cd214
Author: slu2011 <lu...@gmail.com>
Authored: Fri Feb 15 14:19:24 2013 -0800
Committer: slu2011 <lu...@gmail.com>
Committed: Fri Feb 15 14:19:24 2013 -0800

----------------------------------------------------------------------
 .../webapp/resources/SchedulerTasksResource.java   |   18 +-
 .../org/apache/helix/ClusterMessagingService.java  |   13 +
 .../stages/ExternalViewComputeStage.java           |  154 ++++-
 .../controller/stages/MessageGenerationPhase.java  |   36 +-
 .../zk/DefaultSchedulerMessageHandlerFactory.java  |  123 +++-
 .../apache/helix/manager/zk/ZKHelixManager.java    |    4 +
 .../helix/messaging/DefaultMessagingService.java   |    2 +-
 .../messaging/handling/HelixTaskExecutor.java      |    4 +-
 .../main/java/org/apache/helix/model/Message.java  |    3 +-
 .../statemachine/ScheduledTaskStateModel.java      |  127 ++++
 .../ScheduledTaskStateModelFactory.java            |   43 ++
 .../java/org/apache/helix/tools/ClusterSetup.java  |    3 +
 .../helix/tools/StateModelConfigGenerator.java     |   63 ++
 .../src/test/java/org/apache/helix/Mocks.java      |    8 +
 .../helix/integration/TestSchedulerMessage.java    |  581 ++++++++++++++-
 15 files changed, 1129 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
index deb498b..097771e 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixException;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
@@ -62,6 +63,7 @@ public class SchedulerTasksResource extends Resource
 
   public static String CRITERIA = "Criteria";
   public static String MESSAGETEMPLATE = "MessageTemplate";
+  public static String TASKQUEUENAME = "TaskQueueName";
   public SchedulerTasksResource(Context context,
           Request request,
           Response response) 
@@ -152,17 +154,27 @@ public class SchedulerTasksResource extends Resource
       {
         throw new HelixException("SchedulerTasksResource need to have Criteria specified.");
       }
+      HelixDataAccessor accessor = ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+      LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
+      if(leader == null)
+      {
+        throw new HelixException("There is no leader for the cluster " + clusterName);
+      }
       
       Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG, UUID.randomUUID().toString());
       schedulerMessage.getRecord().getSimpleFields().put(CRITERIA, criteriaString);
       
       schedulerMessage.getRecord().getMapFields().put(MESSAGETEMPLATE, messageTemplate);
       
-      schedulerMessage.setTgtSessionId("*");
+      schedulerMessage.setTgtSessionId(leader.getSessionId());
       schedulerMessage.setTgtName("CONTROLLER");
       schedulerMessage.setSrcInstanceType(InstanceType.CONTROLLER);
-      
-      HelixDataAccessor accessor = ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+      String taskQueueName =  ClusterRepresentationUtil.getFormJsonParameterString(form, TASKQUEUENAME);
+      if(taskQueueName != null)
+      {
+        throw new HelixException("SchedulerTasksResource need to have " + TASKQUEUENAME +" specified.");
+      }
+      schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, taskQueueName);
       accessor.setProperty(accessor.keyBuilder().controllerMessage(schedulerMessage.getMsgId()), schedulerMessage);
       
       Map<String, String> resultMap = new HashMap<String, String>();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
index 999c05b..dc4a0ad 100644
--- a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
@@ -19,6 +19,9 @@ package org.apache.helix;
  * under the License.
  */
 
+import java.util.List;
+import java.util.Map;
+
 import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
@@ -114,5 +117,15 @@ public interface ClusterMessagingService
   public void registerMessageHandlerFactory(String type,
       MessageHandlerFactory factory);
   
+  /**
+   * This will generate all messages to be sent given the recipientCriteria and MessageTemplate,
+   * the messages are not sent.
+   * 
+   * @param receipientCriteria
+   * @param messageTemplate
+   * @return
+   */
+  public Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
+      final Message messageTemplate);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index a691a25..ec57c17 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -20,17 +20,30 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordDelta;
+import org.apache.helix.ZNRecordDelta.MergeOperation;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
@@ -101,12 +114,15 @@ public class ExternalViewComputeStage extends AbstractBaseStage
       // Update cluster status monitor mbean
       ClusterStatusMonitor clusterStatusMonitor =
           (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-      if (clusterStatusMonitor != null)
+      IdealState idealState = cache._idealStateMap.get(view.getResourceName());
+      if(idealState != null)
       {
-        clusterStatusMonitor.onExternalViewChange(view,
+        if (clusterStatusMonitor != null && !idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
+        {
+          clusterStatusMonitor.onExternalViewChange(view,
                                                   cache._idealStateMap.get(view.getResourceName()));
+        }
       }
-
       // compare the new external view with current one, set only on different
       Map<String, ExternalView> curExtViews =
           dataAccessor.getChildValuesMap(manager.getHelixDataAccessor()
@@ -118,11 +134,20 @@ public class ExternalViewComputeStage extends AbstractBaseStage
       {
         keys.add(manager.getHelixDataAccessor().keyBuilder().externalView(resourceName));
         newExtViews.add(view);
-        // dataAccessor.setProperty(PropertyType.EXTERNALVIEW, view,
-        // resourceName);
+        // dataAccessor.setProperty(PropertyType.EXTERNALVIEW, view, resourceName);
+        
+        // For SCHEDULER_TASK_RESOURCE resource group (helix task queue), we need to find out which task 
+        // partitions are finished (COMPLETED or ERROR), update the status update of the original scheduler 
+        // message, and then remove the partitions from the ideal state
+        if(idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
+        {
+          updateScheduledTaskStatus(view, manager, idealState);
+        }
       }
     }
-
+    // TODO: consider not setting the externalview of SCHEDULER_TASK_QUEUE at all. 
+    // Are there any entity that will be interested in its change?
+    
     if (newExtViews.size() > 0)
     {
       dataAccessor.setChildren(keys, newExtViews);
@@ -132,5 +157,122 @@ public class ExternalViewComputeStage extends AbstractBaseStage
     log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime)
         + " ms");
   }
+  
+  private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager, IdealState taskQueueIdealState)
+  {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    ZNRecord finishedTasks = new ZNRecord(ev.getResourceName());
+    
+    // Place holder for finished partitions
+    Map<String, String> emptyMap = new HashMap<String, String>();
+    List<String> emptyList = new LinkedList<String>();
+    
+    Map<String, Integer> controllerMsgIdCountMap = new HashMap<String, Integer>();
+    Map<String, Map<String, String>> controllerMsgUpdates = new HashMap<String, Map<String, String>>();
+    
+    Builder keyBuilder = accessor.keyBuilder();
+          
+    for(String taskPartitionName : ev.getPartitionSet())
+    {
+      for(String taskState : ev.getStateMap(taskPartitionName).values())
+      {
+        if(taskState.equalsIgnoreCase("ERROR") || taskState.equalsIgnoreCase("COMPLETED"))
+        {
+          log.info(taskPartitionName + " finished as " + taskState);
+          finishedTasks.getListFields().put(taskPartitionName, emptyList);
+          finishedTasks.getMapFields().put(taskPartitionName, emptyMap);
+          
+          // Update original scheduler message status update
+          if(taskQueueIdealState.getRecord().getMapField(taskPartitionName) != null)
+          {
+            String controllerMsgId 
+              = taskQueueIdealState.getRecord().getMapField(taskPartitionName).get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+            if(controllerMsgId != null)
+            {
+              log.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
+              if(!controllerMsgUpdates.containsKey(controllerMsgId))
+              {
+                controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
+              }
+              controllerMsgUpdates.get(controllerMsgId).put(taskPartitionName, taskState);
+            }
+          }
+        }
+      }
+    }
+    // fill the controllerMsgIdCountMap
+    for(String taskId : taskQueueIdealState.getPartitionSet())
+    {
+      String controllerMsgId 
+        = taskQueueIdealState.getRecord().getMapField(taskId).get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+      if(controllerMsgId != null)
+      {
+        if(!controllerMsgIdCountMap.containsKey(controllerMsgId))
+        {
+          controllerMsgIdCountMap.put(controllerMsgId, 0);
+        }
+        controllerMsgIdCountMap.put(controllerMsgId, (controllerMsgIdCountMap.get(controllerMsgId) + 1));
+      }
+    }
+    
+    if(controllerMsgUpdates.size() > 0)
+    {
+      for(String controllerMsgId : controllerMsgUpdates.keySet())
+      {
+        PropertyKey controllerStatusUpdateKey 
+          = keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), controllerMsgId);
+        StatusUpdate controllerStatusUpdate = accessor.getProperty(controllerStatusUpdateKey);
+        for(String taskPartitionName : controllerMsgUpdates.get(controllerMsgId).keySet())
+        {
+          Map<String, String> result = new HashMap<String, String>();
+          result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName));
+          controllerStatusUpdate.getRecord().setMapField("MessageResult "  + 
+             taskQueueIdealState.getRecord().getMapField(taskPartitionName).get(Message.Attributes.TGT_NAME.toString()) + " " + taskPartitionName + " " + 
+             taskQueueIdealState.getRecord().getMapField(taskPartitionName).get(Message.Attributes.MSG_ID.toString())
+             , result);
+        }
+        // All done for the scheduled tasks that came from controllerMsgId, add summary for it
+        if(controllerMsgUpdates.get(controllerMsgId).size() == controllerMsgIdCountMap.get(controllerMsgId).intValue())
+        {
+          int finishedTasksNum = 0;
+          int completedTasksNum = 0;
+          for(String key : controllerStatusUpdate.getRecord().getMapFields().keySet())
+          {
+            if(key.startsWith("MessageResult "))
+            {
+              finishedTasksNum ++;
+            }
+            if(controllerStatusUpdate.getRecord().getMapField(key).get("Result") != null)
+            {
+              if(controllerStatusUpdate.getRecord().getMapField(key).get("Result").equalsIgnoreCase("COMPLETED"))
+              {
+                completedTasksNum++;
+              }
+            }
+          }
+          Map<String, String> summary = new TreeMap<String, String>();
+          summary.put("TotalMessages:", "" + finishedTasksNum);
+          summary.put("CompletedMessages", "" + completedTasksNum);
+          
+          controllerStatusUpdate.getRecord().setMapField("Summary", summary);
+        }
+        // Update the statusUpdate of controllerMsgId
+        accessor.updateProperty(controllerStatusUpdateKey, controllerStatusUpdate);
+      }
+    }
+    
+    if(finishedTasks.getListFields().size() > 0)
+    {
+      ZNRecordDelta znDelta = new ZNRecordDelta(finishedTasks, MergeOperation.SUBTRACT);
+      List<ZNRecordDelta> deltaList = new LinkedList<ZNRecordDelta>();
+      deltaList.add(znDelta);
+      IdealState delta = new IdealState(taskQueueIdealState.getResourceName());
+      delta.setDeltaList(deltaList);
+
+      // Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate
+      keyBuilder = accessor.keyBuilder();
+      accessor.updateProperty(keyBuilder.idealStates(taskQueueIdealState.getResourceName()), delta);
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 6ca206f..adb81ab 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.apache.helix.HelixManager;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
@@ -135,23 +136,40 @@ public class MessageGenerationPhase extends AbstractBaseStage
                 instanceName, currentState, nextState, sessionIdMap.get(instanceName),
                 stateModelDef.getId(), resource.getStateModelFactoryname(), bucketSize);
             IdealState idealState = cache.getIdealState(resourceName);
+            if(idealState!= null && idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
+            {
+              if(idealState.getRecord().getMapField(partition.getPartitionName())!=null)
+              {
+                message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(), idealState.getRecord().getMapField(partition.getPartitionName()));
+              }
+            }
             // Set timeout of needed
             String stateTransition = currentState + "-" + nextState + "_"
                 + Message.Attributes.TIMEOUT;
-            if (idealState != null
-                && idealState.getRecord().getSimpleField(stateTransition) != null)
+            if (idealState != null)
             {
-              try
+              String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
+              if(timeOutStr == null && idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
               {
-                int timeout = Integer.parseInt(idealState.getRecord().getSimpleField(
-                    stateTransition));
-                if (timeout > 0)
+                // scheduled task queue
+                if(idealState.getRecord().getMapField(partition.getPartitionName()) != null)
                 {
-                  message.setExecutionTimeout(timeout);
+                  timeOutStr = idealState.getRecord().getMapField(partition.getPartitionName()).get(Message.Attributes.TIMEOUT.toString());
                 }
-              } catch (Exception e)
+              }
+              if(timeOutStr !=null)
               {
-                logger.error("", e);
+                try
+                {
+                  int timeout = Integer.parseInt(timeOutStr);
+                  if (timeout > 0)
+                  {
+                    message.setExecutionTimeout(timeout);
+                  }
+                } catch (Exception e)
+                {
+                  logger.error("", e);
+                }
               }
             }
             message.getRecord().setSimpleField("ClusterEventName", event.getName());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 62a6d76..49101c9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -20,7 +20,12 @@ package org.apache.helix.manager.zk;
  */
 
 import java.io.StringReader;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -30,6 +35,7 @@ import org.apache.helix.Criteria;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
@@ -37,6 +43,7 @@ import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.model.Message.MessageType;
@@ -46,8 +53,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 
 
 /*
- * TODO: The current implementation is temporary for backup handler testing only and it does not 
- * do any throttling. 
+ * The current implementation supports throttling on STATE-TRANSITION type of message, transition SCHEDULED-COMPLETED. 
  * 
  */
 public class DefaultSchedulerMessageHandlerFactory implements
@@ -55,6 +61,9 @@ public class DefaultSchedulerMessageHandlerFactory implements
 {
   public static final String WAIT_ALL = "WAIT_ALL";
   public static final String SCHEDULER_MSG_ID = "SchedulerMessageId";
+  public static final String SCHEDULER_TASK_QUEUE = "SchedulerTaskQueue";
+  public static final String CONTROLLER_MSG_ID = "controllerMsgId";
+  public static final int TASKQUEUE_BUCKET_NUM = 10;
   public static class SchedulerAsyncCallback extends AsyncCallback
   {
     StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
@@ -168,6 +177,101 @@ public class DefaultSchedulerMessageHandlerFactory implements
       super(message, context);
       _manager = manager;
     }
+    
+    void handleMessageUsingScheduledTaskQueue(Criteria recipientCriteria, Message messageTemplate, String controllerMsgId)
+    {
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();  
+      Builder keyBuilder = accessor.keyBuilder();
+
+      Map<String, String> sendSummary = new HashMap<String, String>();
+      sendSummary.put("MessageCount", "0");
+      Map<InstanceType, List<Message>> messages 
+        = _manager.getMessagingService().generateMessage(recipientCriteria, messageTemplate);
+     
+      // Calculate tasks, and put them into the idealState of the SCHEDULER_TASK_QUEUE resource.
+      // List field are the destination node, while the Message parameters are stored in the mapFields
+      // task throttling can be done on SCHEDULER_TASK_QUEUE resource
+      if(messages.size() > 0)
+      {
+        String taskQueueName = _message.getRecord().getSimpleField(SCHEDULER_TASK_QUEUE);
+        if(taskQueueName == null)
+        {
+          throw new HelixException("SchedulerTaskMessage need to have " + SCHEDULER_TASK_QUEUE +" specified.");
+        }
+        IdealState newAddedScheduledTasks = new IdealState(taskQueueName);
+        newAddedScheduledTasks.setBucketSize(TASKQUEUE_BUCKET_NUM);
+        newAddedScheduledTasks.setStateModelDefRef(SCHEDULER_TASK_QUEUE);
+        
+        synchronized(_manager)
+        {
+          int existingTopPartitionId = 0;
+          IdealState currentTaskQueue =  _manager.getHelixDataAccessor()
+              .getProperty(accessor.keyBuilder().idealStates(newAddedScheduledTasks.getId()));
+          if(currentTaskQueue != null)
+          {
+            existingTopPartitionId = findTopPartitionId(currentTaskQueue) + 1;
+          }
+          
+          List<Message> taskMessages = (List<Message>)(messages.values().toArray()[0]);
+          for(Message task : taskMessages)
+          {
+            String partitionId = taskQueueName + "_" + existingTopPartitionId;
+            existingTopPartitionId++;
+            String instanceName = task.getTgtName();
+            newAddedScheduledTasks.setPartitionState(partitionId, instanceName, "COMPLETED");
+            task.getRecord().setSimpleField(instanceName, "COMPLETED");
+            task.getRecord().setSimpleField(CONTROLLER_MSG_ID, controllerMsgId);
+            
+            List<String> priorityList = new LinkedList<String>();
+            priorityList.add(instanceName);
+            newAddedScheduledTasks.getRecord().setListField(partitionId, priorityList);
+            newAddedScheduledTasks.getRecord().setMapField(partitionId, task.getRecord().getSimpleFields());
+            _logger.info("Scheduling for controllerMsg " + controllerMsgId + " , sending task " + partitionId + " " + task.getMsgId()
+                 + " to "+instanceName );
+            
+            if(_logger.isDebugEnabled())
+            {
+              _logger.debug(task.getRecord().getSimpleFields());
+            }
+          }
+          _manager.getHelixDataAccessor()
+            .updateProperty(accessor.keyBuilder().idealStates(newAddedScheduledTasks.getId()), newAddedScheduledTasks);
+          sendSummary.put("MessageCount", "" + taskMessages.size());
+        }
+      }
+      // Record the number of messages sent into scheduler message status updates
+      
+      ZNRecord statusUpdate = accessor.getProperty(
+          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+              _message.getMsgId())).getRecord();
+      
+      statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
+      accessor.updateProperty(keyBuilder.controllerTaskStatus(
+          MessageType.SCHEDULER_MSG.toString(), _message.getMsgId()),
+          new StatusUpdate(statusUpdate));
+    }
+
+    private int findTopPartitionId(IdealState currentTaskQueue)
+    {
+      int topId = 0;
+      for(String partitionName : currentTaskQueue.getPartitionSet())
+      {
+        try
+        {
+          String partitionNumStr = partitionName.substring(partitionName.lastIndexOf('_') + 1);
+          int num = Integer.parseInt(partitionNumStr);
+          if(topId < num)
+          {
+            topId = num;
+          }
+        }
+        catch(Exception e)
+        {
+          _logger.error("", e);
+        }
+      }
+      return topId;
+    }
 
     @Override
     public HelixTaskResult handleMessage() throws InterruptedException
@@ -227,8 +331,21 @@ public class DefaultSchedulerMessageHandlerFactory implements
           _logger.warn("",e);
         }
       }
-      // Send all messages.
       
+      // If the target is PARTICIPANT, use the ScheduledTaskQueue
+      if(InstanceType.PARTICIPANT == recipientCriteria.getRecipientInstanceType())
+      {
+        handleMessageUsingScheduledTaskQueue(recipientCriteria, messageTemplate, _message.getMsgId());
+        result.setSuccess(true);
+        result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId());
+        result.getTaskResultMap().put(
+            "ControllerResult",
+            "msg " + _message.getMsgId() + " from " + _message.getMsgSrc()
+                + " processed");
+        return result;
+      }
+      
+      _logger.info("Scheduler sending message to Controller");
       int nMsgsSent = 0;
       SchedulerAsyncCallback callback = new SchedulerAsyncCallback(_message, _manager);
       if(waitAll)

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 3d13cd0..dc0f5b6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -72,6 +72,7 @@ import org.apache.helix.monitoring.ZKPathDataDumpTask;
 import org.apache.helix.participant.DistClusterControllerElection;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
 import org.apache.helix.store.ZNRecordJsonSerializer;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.tools.PropertiesReader;
@@ -792,6 +793,9 @@ public class ZKHelixManager implements HelixManager
                                                     _stateMachEngine);
     addMessageListener(_messagingService.getExecutor(), _instanceName);
     addControllerListener(_helixAccessor);
+    
+    ScheduledTaskStateModelFactory stStateModelFactory = new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
+    _stateMachEngine.registerStateModelFactory(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
 
     if (_participantHealthCheckInfoCollector == null)
     {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index b6812a4..558c91c 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -160,7 +160,7 @@ public class DefaultMessagingService implements ClusterMessagingService
     return totalMessageCount;
   }
 
-  private Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
+  public Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
                                                            final Message message)
   {
     Map<InstanceType, List<Message>> messagesToSendMap =

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index c674d69..4220d22 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -50,12 +50,10 @@ import org.apache.helix.MessageListener;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.Attributes;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.monitoring.ParticipantMonitor;
@@ -624,7 +622,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
     }
   }
 
-  private MessageHandler createMessageHandler(Message message,
+  public MessageHandler createMessageHandler(Message message,
                                               NotificationContext changeContext)
   {
     String msgType = message.getMsgType().toString();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index e0ae9aa..9d07404 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -79,7 +79,8 @@ public class Message extends HelixProperty
     RETRY_COUNT,
     STATE_MODEL_FACTORY_NAME,
     BUCKET_SIZE,
-    PARENT_MSG_ID   // used for group message mode
+    PARENT_MSG_ID,   // used for group message mode
+    INNER_MESSAGE
   }
 
   public enum MessageState

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
new file mode 100644
index 0000000..e4df933
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
@@ -0,0 +1,127 @@
+package org.apache.helix.participant.statemachine;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+
+public class ScheduledTaskStateModel extends StateModel
+{
+  static final String DEFAULT_INITIAL_STATE = "OFFLINE";
+  Logger logger = Logger.getLogger(ScheduledTaskStateModel.class);
+  
+  // TODO Get default state from implementation or from state model annotation
+  // StateModel with initial state other than OFFLINE should override this field
+  protected String _currentState = DEFAULT_INITIAL_STATE;
+  final ScheduledTaskStateModelFactory _factory;
+  final String _partitionName;
+
+ final HelixTaskExecutor _executor;
+
+  public ScheduledTaskStateModel(ScheduledTaskStateModelFactory factory, HelixTaskExecutor executor, String partitionName)
+  {
+   _factory = factory; 
+   _partitionName = partitionName;
+   _executor = executor;
+  }
+
+  @Transition(to="COMPLETED",from="OFFLINE")
+  public void onBecomeCompletedFromOffline(Message message,
+      NotificationContext context) throws InterruptedException
+  {
+    logger.info(_partitionName + " onBecomeCompletedFromOffline");
+    
+    // Construct the inner task message from the mapfields of scheduledTaskQueue resource group
+    Map<String, String> messageInfo = message.getRecord().getMapField(Message.Attributes.INNER_MESSAGE.toString());
+    ZNRecord record = new ZNRecord(_partitionName);
+    record.getSimpleFields().putAll(messageInfo);
+    Message taskMessage = new Message(record);
+    if(logger.isDebugEnabled())
+    {
+      logger.debug(taskMessage.getRecord().getSimpleFields());
+    }
+    MessageHandler handler = _executor.createMessageHandler(taskMessage, new NotificationContext(null));
+    if (handler == null)
+    {
+      throw new HelixException("Task message " + taskMessage.getMsgType() + " handler not found, task id " + _partitionName);
+    }
+    // Invoke the internal handler to complete the task
+    handler.handleMessage();
+    logger.info(_partitionName + " onBecomeCompletedFromOffline completed");
+  }
+  
+  @Transition(to="OFFLINE",from="COMPLETED")
+  public void onBecomeOfflineFromCompleted(Message message,
+      NotificationContext context)
+  {
+    logger.info(_partitionName + " onBecomeOfflineFromCompleted");
+  }
+
+  @Transition(to="DROPPED",from="COMPLETED")
+  public void onBecomeDroppedFromCompleted(Message message,
+      NotificationContext context)
+  {
+    logger.info(_partitionName + " onBecomeDroppedFromCompleted");
+    removeFromStatemodelFactory();
+  }
+  
+
+  @Transition(to="DROPPED",from="OFFLINE")
+  public void onBecomeDroppedFromOffline(Message message,
+      NotificationContext context) throws InterruptedException
+  {
+    logger.info(_partitionName + " onBecomeDroppedFromScheduled");
+    removeFromStatemodelFactory();
+  }
+
+  @Transition(to="OFFLINE",from="ERROR")
+  public void onBecomeOfflineFromError(Message message,
+      NotificationContext context) throws InterruptedException
+  {
+    logger.info(_partitionName + " onBecomeOfflineFromError");
+  }
+  
+  public void reset()
+  {
+    logger.info(_partitionName + " ScheduledTask reset");
+    removeFromStatemodelFactory();
+  }
+  
+  // We need this to prevent state model leak
+  private void removeFromStatemodelFactory()
+  {
+    if(_factory.getStateModelMap().containsKey(_partitionName))
+    {
+      _factory.getStateModelMap().remove(_partitionName);
+    }
+    else
+    {
+      logger.warn(_partitionName + " not found in ScheduledTaskStateModelFactory");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
new file mode 100644
index 0000000..6f6d1ed
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
@@ -0,0 +1,43 @@
+package org.apache.helix.participant.statemachine;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.log4j.Logger;
+
+
+public class ScheduledTaskStateModelFactory extends StateModelFactory<ScheduledTaskStateModel>
+{
+  Logger logger = Logger.getLogger(ScheduledTaskStateModelFactory.class);
+  
+  HelixTaskExecutor _executor;
+  
+  public ScheduledTaskStateModelFactory(HelixTaskExecutor executor)
+  {
+    _executor = executor;
+  }
+  
+  @Override
+  public ScheduledTaskStateModel createNewStateModel(String partitionName)
+  {
+    logger.info("Create state model for ScheduledTask " + partitionName);
+    return new ScheduledTaskStateModel(this, _executor, partitionName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 59cb76b..bde82f6 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -162,6 +162,9 @@ public class ClusterSetup
     addStateModelDef(clusterName,
                      "OnlineOffline",
                      new StateModelDefinition(generator.generateConfigForOnlineOffline()));
+    addStateModelDef(clusterName,
+                     "ScheduledTask",
+                     new StateModelDefinition(generator.generateConfigForScheduledTaskQueue()));
   }
 
   public void activateCluster(String clusterName, String grandCluster, boolean enable)

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
index 53278e2..dada08a 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
@@ -25,8 +25,11 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.Transition;
 import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
+import org.apache.helix.participant.statemachine.StateTransitionTableBuilder;
 
 
 public class StateModelConfigGenerator
@@ -348,4 +351,64 @@ public class StateModelConfigGenerator
     // ZNRecordSerializer serializer = new ZNRecordSerializer();
     // System.out.println(new String(serializer.serialize(record)));
   }
+  
+  public ZNRecord generateConfigForScheduledTaskQueue()
+  {
+    ZNRecord record = new ZNRecord(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE);
+    record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
+                          "OFFLINE");
+    List<String> statePriorityList = new ArrayList<String>();
+    statePriorityList.add("COMPLETED");
+    statePriorityList.add("OFFLINE");
+    statePriorityList.add("DROPPED");
+    record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+                        statePriorityList);
+    for (String state : statePriorityList)
+    {
+      String key = state + ".meta";
+      Map<String, String> metadata = new HashMap<String, String>();
+      if (state.equals("COMPLETED"))
+      {
+        metadata.put("count", "1");
+        record.setMapField(key, metadata);
+      }
+      if (state.equals("OFFLINE"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+      if (state.equals("DROPPED"))
+      {
+        metadata.put("count", "-1");
+        record.setMapField(key, metadata);
+      }
+    }
+
+    List<String> states = new ArrayList<String>();
+    states.add("COMPLETED");
+    states.add("DROPPED");
+    states.add("OFFLINE");
+
+    List<Transition> transitions = new ArrayList<Transition>();
+    transitions.add(new Transition("OFFLINE", "COMPLETED"));
+    transitions.add(new Transition("OFFLINE", "DROPPED"));
+    transitions.add(new Transition("COMPLETED", "DROPPED"));
+
+    StateTransitionTableBuilder builder = new StateTransitionTableBuilder();
+    Map<String, Map<String, String>> next = builder.buildTransitionTable(states, transitions);
+
+    for (String state : statePriorityList)
+    {
+      String key = state + ".next";
+      record.setMapField(key, next.get(state));
+    }
+    List<String> stateTransitionPriorityList = new ArrayList<String>();
+    stateTransitionPriorityList.add("OFFLINE-COMPLETED");
+    stateTransitionPriorityList.add("OFFLINE-DROPPED");
+    stateTransitionPriorityList.add("COMPLETED-DROPPED");
+
+    record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+                        stateTransitionPriorityList);
+    return record;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index b0aa4ac..d71556a 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -791,5 +791,13 @@ public class Mocks {
 			return 0;
 		}
 
+    @Override
+    public Map<InstanceType, List<Message>> generateMessage(
+        Criteria recipientCriteria, Message messageTemplate)
+    {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 98421c3..5ee4f85 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -19,13 +19,17 @@ package org.apache.helix.integration;
  * under the License.
  */
 
+import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixDataAccessor;
@@ -46,6 +50,8 @@ import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.monitoring.ZKPathDataDumpTask;
 import org.apache.helix.util.HelixUtil;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
 import org.testng.Assert;
@@ -54,6 +60,27 @@ import org.testng.annotations.Test;
 
 public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck
 {
+
+  class MockAsyncCallback extends AsyncCallback
+  {
+    Message _message;
+    public MockAsyncCallback()
+    {
+    }
+
+    @Override
+    public void onTimeOut()
+    {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void onReplyMessage(Message message)
+    {
+      _message = message;
+    }
+  }
   TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
   public static class TestMessagingHandlerFactory implements
       MessageHandlerFactory
@@ -101,8 +128,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
                 new ConcurrentSkipListSet<String>());
           }
         }
-        _results.get(_message.getPartitionName()).add(destName);
-
+        _results.get(_message.getPartitionName()).add(_message.getMsgId());
+        //System.err.println("Message " + _message.getMsgId() + " executed");
         return result;
       }
 
@@ -110,10 +137,77 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
       public void onError(Exception e, ErrorCode code, ErrorType type)
       {
         // TODO Auto-generated method stub
+      }
+    }
+  }
+  
+  public static class TestMessagingHandlerFactoryLatch implements
+    MessageHandlerFactory
+  {
+    public volatile CountDownLatch _latch = new CountDownLatch(1);
+    public int _messageCount = 0;
+    public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
+    @Override
+    public synchronized MessageHandler createHandler(Message message,
+      NotificationContext context)
+    {
+      _messageCount++;
+      return new TestMessagingHandlerLatch(message, context);
+    }
+    
+    public synchronized void signal()
+    {
+      _latch.countDown();
+      _latch = new CountDownLatch(1);
+    }
+  
+    @Override
+    public String getMessageType()
+    {
+      return "TestMessagingHandlerLatch";
+    }
+  
+    @Override
+    public void reset()
+    {
+      // TODO Auto-generated method stub
+    }
 
+  public class TestMessagingHandlerLatch extends MessageHandler
+  {
+    public TestMessagingHandlerLatch(Message message, NotificationContext context)
+    {
+      super(message, context);
+      // TODO Auto-generated constructor stub
+    }
+  
+    @Override
+    public HelixTaskResult handleMessage() throws InterruptedException
+    {
+      _latch.await();
+      HelixTaskResult result = new HelixTaskResult();
+      result.setSuccess(true);
+      String destName = _message.getTgtName();
+      synchronized (_results)
+      {
+        if (!_results.containsKey(_message.getPartitionName()))
+        {
+          _results.put(_message.getPartitionName(),
+              new ConcurrentSkipListSet<String>());
+        }
       }
+      _results.get(_message.getPartitionName()).add(destName);
+      //System.err.println("Message " + _message.getMsgId() + " executed");
+      return result;
+    }
+  
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type)
+    {
+      // TODO Auto-generated method stub
     }
   }
+}
 
   @Test()
   public void TestSchedulerMsg() throws Exception
@@ -134,7 +228,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     schedulerMessage.setTgtName("CONTROLLER");
     // TODO: change it to "ADMIN" ?
     schedulerMessage.setSrcName("CONTROLLER");
-
+    schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
     // Template for the individual message sent to each participant
     Message msg = new Message(_factory.getMessageType(), "Template");
     msg.setTgtSessionId("*");
@@ -167,8 +261,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     helixDataAccessor.createProperty(
         keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
         schedulerMessage);
-
-    Thread.sleep(15000);
     
     for(int i = 0; i < 30; i++)
     {
@@ -323,35 +415,27 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
     cr2.setInstanceName("*");
     cr2.setSessionSpecific(false);
-    
-    class MockAsyncCallback extends AsyncCallback
-    {
-      Message _message;
-      public MockAsyncCallback()
-      {
-      }
 
-      @Override
-      public void onTimeOut()
-      {
-        // TODO Auto-generated method stub
-
-      }
-
-      @Override
-      public void onReplyMessage(Message message)
-      {
-        _message = message;
-      }
-
-    }
+    schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg2");
     MockAsyncCallback callback = new MockAsyncCallback();
     manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
     String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
     
     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
     Builder keyBuilder = helixDataAccessor.keyBuilder();
-
+    for(int i = 0;i < 10; i++ )
+    {
+      Thread.sleep(200);
+      PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+          MessageType.SCHEDULER_MSG.toString(), msgId);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+          .getRecord();
+      if(statusUpdate.getMapFields().containsKey("Summary"))
+      {
+        break;
+      }
+    }
+    
     Assert.assertEquals(_PARTITIONS, _factory._results.size());
     PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
         MessageType.SCHEDULER_MSG.toString(), msgId);
@@ -446,4 +530,447 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     }
     Assert.assertEquals(count, 0);
   }
+  
+
+  @Test()
+  public void TestSchedulerMsg3() throws Exception
+  {
+    _factory._results.clear();
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String hostDest = "localhost_" + (START_PORT + i);
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+          .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+      // 
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+      .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+      manager = _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+        .randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+
+    // Template for the individual message sent to each participant
+    Message msg = new Message(_factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate",
+        msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+
+    schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg3");
+    Criteria cr2 = new Criteria();
+    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+    cr2.setInstanceName("*");
+    cr2.setSessionSpecific(false);
+    
+    MockAsyncCallback callback = new MockAsyncCallback();
+    cr.setInstanceName("localhost_%");
+    mapper = new ObjectMapper();
+    serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    crString = sw.toString();
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    
+    for(int i = 0; i < 4; i++)
+    {
+      callback = new MockAsyncCallback();
+      cr.setInstanceName("localhost_"+(START_PORT + i));
+      mapper = new ObjectMapper();
+      serializationConfig = mapper.getSerializationConfig();
+      serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+      sw = new StringWriter();
+      mapper.writeValue(sw, cr);
+      schedulerMessage.setMsgId(UUID.randomUUID().toString());
+      crString = sw.toString();
+      schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+      manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+      String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+      
+      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+      Builder keyBuilder = helixDataAccessor.keyBuilder();
+      
+      for(int j = 0;j < 100; j++ )
+      {
+        Thread.sleep(200);
+        PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+            MessageType.SCHEDULER_MSG.toString(), msgId);
+        ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+            .getRecord();
+        if(statusUpdate.getMapFields().containsKey("Summary"))
+        {
+          break;
+        }
+      }
+      
+      PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+          MessageType.SCHEDULER_MSG.toString(), msgId);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+          .getRecord();
+      Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+          .get("MessageCount").equals("" + (_PARTITIONS * 3 / 5)));
+      int messageResultCount = 0;
+      for(String key : statusUpdate.getMapFields().keySet())
+      {
+        if(key.startsWith("MessageResult"))
+        {
+          messageResultCount ++;
+        }
+      }
+      Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
+      
+      int count = 0;
+      //System.out.println(i);
+      for (Set<String> val : _factory._results.values())
+      {
+        //System.out.println(val);
+        count += val.size();
+      }
+      //System.out.println(count);
+      Assert.assertEquals(count, _PARTITIONS * 3/ 5 *(i+1) );
+    }
+  }
+  
+
+  @Test()
+  public void TestSchedulerMsg4() throws Exception
+  {
+    _factory._results.clear();
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String hostDest = "localhost_" + (START_PORT + i);
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+          .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+      // 
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+      .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+      manager = _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+        .randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+
+    // Template for the individual message sent to each participant
+    Message msg = new Message(_factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("TestDB");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate",
+        msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+
+    schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg4");
+    Criteria cr2 = new Criteria();
+    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+    cr2.setInstanceName("*");
+    cr2.setSessionSpecific(false);
+    
+    Map<String, String> constraints = new TreeMap<String, String>();
+    constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
+    constraints.put("TRANSITION", "OFFLINE-COMPLETED");
+    constraints.put("CONSTRAINT_VALUE", "1");
+    constraints.put("INSTANCE", ".*");
+    manager.getClusterManagmentTool().addMessageConstraint(manager.getClusterName(), "constraint1", constraints);
+    
+    MockAsyncCallback callback = new MockAsyncCallback();
+    cr.setInstanceName("localhost_%");
+    mapper = new ObjectMapper();
+    serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    crString = sw.toString();
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+    String msgIdPrime = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = helixDataAccessor.keyBuilder();
+    ArrayList<String> msgIds = new ArrayList<String>();
+    for(int i = 0; i < NODE_NR; i++)
+    {
+      callback = new MockAsyncCallback();
+      cr.setInstanceName("localhost_"+(START_PORT + i));
+      mapper = new ObjectMapper();
+      serializationConfig = mapper.getSerializationConfig();
+      serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+      sw = new StringWriter();
+      mapper.writeValue(sw, cr);
+      schedulerMessage.setMsgId(UUID.randomUUID().toString());
+      crString = sw.toString();
+      schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+      manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+      String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+      msgIds.add(msgId);
+    }
+    for(int i = 0; i < NODE_NR; i++)
+    {
+      String msgId = msgIds.get(i);
+      for(int j = 0;j < 100; j++ )
+      {
+        Thread.sleep(200);
+        PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+            MessageType.SCHEDULER_MSG.toString(), msgId);
+        ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+            .getRecord();
+        if(statusUpdate.getMapFields().containsKey("Summary"))
+        {
+          //System.err.println(msgId+" done");
+          break;
+        }
+      }
+      
+      PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+          MessageType.SCHEDULER_MSG.toString(), msgId);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+          .getRecord();
+      Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+          .get("MessageCount").equals("" + (_PARTITIONS * 3 / 5)));
+      int messageResultCount = 0;
+      for(String key : statusUpdate.getMapFields().keySet())
+      {
+        if(key.startsWith("MessageResult"))
+        {
+          messageResultCount ++;
+        }
+      }
+      if(messageResultCount != _PARTITIONS * 3 / 5)
+      {
+        int x = 10;
+        x = x + messageResultCount;
+      }
+      Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
+    }
+    
+    for(int j = 0;j < 100; j++ )
+    {
+      Thread.sleep(200);
+      PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+          MessageType.SCHEDULER_MSG.toString(), msgIdPrime);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+          .getRecord();
+      if(statusUpdate.getMapFields().containsKey("Summary"))
+      {
+        break;
+      }
+    }
+    int count = 0;
+    for (Set<String> val : _factory._results.values())
+    {
+      //System.out.println(val);
+      count += val.size();
+    }
+    //System.out.println(count);
+    Assert.assertEquals(count, _PARTITIONS * 3 * 2 );
+  }
+  
+  
+  @Test
+  public void TestSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException, IOException, InterruptedException
+  {
+    TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String hostDest = "localhost_" + (START_PORT + i);
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+          .registerMessageHandlerFactory(factory.getMessageType(), factory);
+      // 
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+      .registerMessageHandlerFactory(factory.getMessageType(), factory);
+      manager = _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+        .randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+
+    // Template for the individual message sent to each participant
+    Message msg = new Message(factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
+
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate",
+        msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+    schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+    schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
+    
+    Criteria cr2 = new Criteria();
+    cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+    cr2.setInstanceName("*");
+    cr2.setSessionSpecific(false);
+    
+    
+    MockAsyncCallback callback = new MockAsyncCallback();
+    mapper = new ObjectMapper();
+    serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = helixDataAccessor.keyBuilder();
+    
+    // Set contraints that only 1 msg per participant
+    Map<String, String> constraints = new TreeMap<String, String>();
+    constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
+    constraints.put("TRANSITION", "OFFLINE-COMPLETED");
+    constraints.put("CONSTRAINT_VALUE", "1");
+    constraints.put("INSTANCE", ".*");
+    manager.getClusterManagmentTool().addMessageConstraint(manager.getClusterName(), "constraint1", constraints);
+    
+    // Send scheduler message
+    crString = sw.toString();
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+    String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+    
+    for(int j = 0;j < 10; j++ )
+    {
+      Thread.sleep(200);
+      PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+          MessageType.SCHEDULER_MSG.toString(), msgId);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+          .getRecord();
+      if(statusUpdate.getMapFields().containsKey("SentMessageCount"))
+      {
+        Assert.assertEquals(statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""+(_PARTITIONS * 3));
+        break;
+      }
+    }
+    
+    for(int i = 0; i < _PARTITIONS * 3 / 5; i++)
+    {
+      for(int j = 0; i< 10; j++)
+      {
+        Thread.sleep(300);
+        if(factory._messageCount == 5*(i+1)) break;
+      }
+      Thread.sleep(300);
+      Assert.assertEquals(factory._messageCount, 5*(i+1));
+      factory.signal();
+      //System.err.println(i);
+    }
+    
+    for(int j = 0;j < 10; j++ )
+    {
+      Thread.sleep(200);
+      PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+          MessageType.SCHEDULER_MSG.toString(), msgId);
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+          .getRecord();
+      if(statusUpdate.getMapFields().containsKey("Summary"))
+      {
+        break;
+      }
+    }
+
+    Assert.assertEquals(_PARTITIONS, factory._results.size());
+    PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+        MessageType.SCHEDULER_MSG.toString(), msgId);
+    ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+        .getRecord();
+    Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+        .get("MessageCount").equals("" + (_PARTITIONS * 3)));
+    int messageResultCount = 0;
+    for(String key : statusUpdate.getMapFields().keySet())
+    {
+      if(key.startsWith("MessageResult "))
+      {
+        messageResultCount ++;
+      }
+    }
+    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+    
+    int count = 0;
+    for (Set<String> val : factory._results.values())
+    {
+      count += val.size();
+    }
+    Assert.assertEquals(count, _PARTITIONS * 3);
+    
+    manager.getClusterManagmentTool().addMessageConstraint(manager.getClusterName(), "constraint1", new TreeMap<String, String>());
+    
+  }
 }