You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by sl...@apache.org on 2013/02/15 23:20:13 UTC
[1/2] git commit: Helix task queue work. scheduler message that are
sent to controller are put in a taskqueue resource groups and actual tasks
are done in a SchedulerTask statemodel. Message handling constraints can be
done at resource level, cluster lev
Helix task queue work. scheduler message that are sent to controller are
put in a taskqueue resource groups and actual tasks are done in a
SchedulerTask statemodel. Message handling constraints can be done at
resource level, cluster level, against the OFFLINE-COMPLETED state
transition type.
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/5e8a912b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/5e8a912b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/5e8a912b
Branch: refs/heads/master
Commit: 5e8a912bd4f4c83ad84cb8f92d3f4acca56972cf
Parents: 88cd214
Author: slu2011 <lu...@gmail.com>
Authored: Fri Feb 15 14:19:24 2013 -0800
Committer: slu2011 <lu...@gmail.com>
Committed: Fri Feb 15 14:19:24 2013 -0800
----------------------------------------------------------------------
.../webapp/resources/SchedulerTasksResource.java | 18 +-
.../org/apache/helix/ClusterMessagingService.java | 13 +
.../stages/ExternalViewComputeStage.java | 154 ++++-
.../controller/stages/MessageGenerationPhase.java | 36 +-
.../zk/DefaultSchedulerMessageHandlerFactory.java | 123 +++-
.../apache/helix/manager/zk/ZKHelixManager.java | 4 +
.../helix/messaging/DefaultMessagingService.java | 2 +-
.../messaging/handling/HelixTaskExecutor.java | 4 +-
.../main/java/org/apache/helix/model/Message.java | 3 +-
.../statemachine/ScheduledTaskStateModel.java | 127 ++++
.../ScheduledTaskStateModelFactory.java | 43 ++
.../java/org/apache/helix/tools/ClusterSetup.java | 3 +
.../helix/tools/StateModelConfigGenerator.java | 63 ++
.../src/test/java/org/apache/helix/Mocks.java | 8 +
.../helix/integration/TestSchedulerMessage.java | 581 ++++++++++++++-
15 files changed, 1129 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
index deb498b..097771e 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixException;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
@@ -62,6 +63,7 @@ public class SchedulerTasksResource extends Resource
public static String CRITERIA = "Criteria";
public static String MESSAGETEMPLATE = "MessageTemplate";
+ public static String TASKQUEUENAME = "TaskQueueName";
public SchedulerTasksResource(Context context,
Request request,
Response response)
@@ -152,17 +154,27 @@ public class SchedulerTasksResource extends Resource
{
throw new HelixException("SchedulerTasksResource need to have Criteria specified.");
}
+ HelixDataAccessor accessor = ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+ LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
+ if(leader == null)
+ {
+ throw new HelixException("There is no leader for the cluster " + clusterName);
+ }
Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG, UUID.randomUUID().toString());
schedulerMessage.getRecord().getSimpleFields().put(CRITERIA, criteriaString);
schedulerMessage.getRecord().getMapFields().put(MESSAGETEMPLATE, messageTemplate);
- schedulerMessage.setTgtSessionId("*");
+ schedulerMessage.setTgtSessionId(leader.getSessionId());
schedulerMessage.setTgtName("CONTROLLER");
schedulerMessage.setSrcInstanceType(InstanceType.CONTROLLER);
-
- HelixDataAccessor accessor = ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+ String taskQueueName = ClusterRepresentationUtil.getFormJsonParameterString(form, TASKQUEUENAME);
+ if(taskQueueName != null)
+ {
+ throw new HelixException("SchedulerTasksResource need to have " + TASKQUEUENAME +" specified.");
+ }
+ schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, taskQueueName);
accessor.setProperty(accessor.keyBuilder().controllerMessage(schedulerMessage.getMsgId()), schedulerMessage);
Map<String, String> resultMap = new HashMap<String, String>();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
index 999c05b..dc4a0ad 100644
--- a/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/ClusterMessagingService.java
@@ -19,6 +19,9 @@ package org.apache.helix;
* under the License.
*/
+import java.util.List;
+import java.util.Map;
+
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
@@ -114,5 +117,15 @@ public interface ClusterMessagingService
public void registerMessageHandlerFactory(String type,
MessageHandlerFactory factory);
+ /**
+ * This will generate all messages to be sent given the recipientCriteria and MessageTemplate,
+ * the messages are not sent.
+ *
+ * @param receipientCriteria
+ * @param messageTemplate
+ * @return
+ */
+ public Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
+ final Message messageTemplate);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index a691a25..ec57c17 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -20,17 +20,30 @@ package org.apache.helix.controller.stages;
*/
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordDelta;
+import org.apache.helix.ZNRecordDelta.MergeOperation;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
+import org.apache.helix.model.StatusUpdate;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
@@ -101,12 +114,15 @@ public class ExternalViewComputeStage extends AbstractBaseStage
// Update cluster status monitor mbean
ClusterStatusMonitor clusterStatusMonitor =
(ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- if (clusterStatusMonitor != null)
+ IdealState idealState = cache._idealStateMap.get(view.getResourceName());
+ if(idealState != null)
{
- clusterStatusMonitor.onExternalViewChange(view,
+ if (clusterStatusMonitor != null && !idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
+ {
+ clusterStatusMonitor.onExternalViewChange(view,
cache._idealStateMap.get(view.getResourceName()));
+ }
}
-
// compare the new external view with current one, set only on different
Map<String, ExternalView> curExtViews =
dataAccessor.getChildValuesMap(manager.getHelixDataAccessor()
@@ -118,11 +134,20 @@ public class ExternalViewComputeStage extends AbstractBaseStage
{
keys.add(manager.getHelixDataAccessor().keyBuilder().externalView(resourceName));
newExtViews.add(view);
- // dataAccessor.setProperty(PropertyType.EXTERNALVIEW, view,
- // resourceName);
+ // dataAccessor.setProperty(PropertyType.EXTERNALVIEW, view, resourceName);
+
+ // For SCHEDULER_TASK_RESOURCE resource group (helix task queue), we need to find out which task
+ // partitions are finished (COMPLETED or ERROR), update the status update of the original scheduler
+ // message, and then remove the partitions from the ideal state
+ if(idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
+ {
+ updateScheduledTaskStatus(view, manager, idealState);
+ }
}
}
-
+ // TODO: consider not setting the externalview of SCHEDULER_TASK_QUEUE at all.
+ // Are there any entity that will be interested in its change?
+
if (newExtViews.size() > 0)
{
dataAccessor.setChildren(keys, newExtViews);
@@ -132,5 +157,122 @@ public class ExternalViewComputeStage extends AbstractBaseStage
log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime)
+ " ms");
}
+
+ private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager, IdealState taskQueueIdealState)
+ {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ ZNRecord finishedTasks = new ZNRecord(ev.getResourceName());
+
+ // Place holder for finished partitions
+ Map<String, String> emptyMap = new HashMap<String, String>();
+ List<String> emptyList = new LinkedList<String>();
+
+ Map<String, Integer> controllerMsgIdCountMap = new HashMap<String, Integer>();
+ Map<String, Map<String, String>> controllerMsgUpdates = new HashMap<String, Map<String, String>>();
+
+ Builder keyBuilder = accessor.keyBuilder();
+
+ for(String taskPartitionName : ev.getPartitionSet())
+ {
+ for(String taskState : ev.getStateMap(taskPartitionName).values())
+ {
+ if(taskState.equalsIgnoreCase("ERROR") || taskState.equalsIgnoreCase("COMPLETED"))
+ {
+ log.info(taskPartitionName + " finished as " + taskState);
+ finishedTasks.getListFields().put(taskPartitionName, emptyList);
+ finishedTasks.getMapFields().put(taskPartitionName, emptyMap);
+
+ // Update original scheduler message status update
+ if(taskQueueIdealState.getRecord().getMapField(taskPartitionName) != null)
+ {
+ String controllerMsgId
+ = taskQueueIdealState.getRecord().getMapField(taskPartitionName).get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+ if(controllerMsgId != null)
+ {
+ log.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
+ if(!controllerMsgUpdates.containsKey(controllerMsgId))
+ {
+ controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
+ }
+ controllerMsgUpdates.get(controllerMsgId).put(taskPartitionName, taskState);
+ }
+ }
+ }
+ }
+ }
+ // fill the controllerMsgIdCountMap
+ for(String taskId : taskQueueIdealState.getPartitionSet())
+ {
+ String controllerMsgId
+ = taskQueueIdealState.getRecord().getMapField(taskId).get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
+ if(controllerMsgId != null)
+ {
+ if(!controllerMsgIdCountMap.containsKey(controllerMsgId))
+ {
+ controllerMsgIdCountMap.put(controllerMsgId, 0);
+ }
+ controllerMsgIdCountMap.put(controllerMsgId, (controllerMsgIdCountMap.get(controllerMsgId) + 1));
+ }
+ }
+
+ if(controllerMsgUpdates.size() > 0)
+ {
+ for(String controllerMsgId : controllerMsgUpdates.keySet())
+ {
+ PropertyKey controllerStatusUpdateKey
+ = keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), controllerMsgId);
+ StatusUpdate controllerStatusUpdate = accessor.getProperty(controllerStatusUpdateKey);
+ for(String taskPartitionName : controllerMsgUpdates.get(controllerMsgId).keySet())
+ {
+ Map<String, String> result = new HashMap<String, String>();
+ result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName));
+ controllerStatusUpdate.getRecord().setMapField("MessageResult " +
+ taskQueueIdealState.getRecord().getMapField(taskPartitionName).get(Message.Attributes.TGT_NAME.toString()) + " " + taskPartitionName + " " +
+ taskQueueIdealState.getRecord().getMapField(taskPartitionName).get(Message.Attributes.MSG_ID.toString())
+ , result);
+ }
+ // All done for the scheduled tasks that came from controllerMsgId, add summary for it
+ if(controllerMsgUpdates.get(controllerMsgId).size() == controllerMsgIdCountMap.get(controllerMsgId).intValue())
+ {
+ int finishedTasksNum = 0;
+ int completedTasksNum = 0;
+ for(String key : controllerStatusUpdate.getRecord().getMapFields().keySet())
+ {
+ if(key.startsWith("MessageResult "))
+ {
+ finishedTasksNum ++;
+ }
+ if(controllerStatusUpdate.getRecord().getMapField(key).get("Result") != null)
+ {
+ if(controllerStatusUpdate.getRecord().getMapField(key).get("Result").equalsIgnoreCase("COMPLETED"))
+ {
+ completedTasksNum++;
+ }
+ }
+ }
+ Map<String, String> summary = new TreeMap<String, String>();
+ summary.put("TotalMessages:", "" + finishedTasksNum);
+ summary.put("CompletedMessages", "" + completedTasksNum);
+
+ controllerStatusUpdate.getRecord().setMapField("Summary", summary);
+ }
+ // Update the statusUpdate of controllerMsgId
+ accessor.updateProperty(controllerStatusUpdateKey, controllerStatusUpdate);
+ }
+ }
+
+ if(finishedTasks.getListFields().size() > 0)
+ {
+ ZNRecordDelta znDelta = new ZNRecordDelta(finishedTasks, MergeOperation.SUBTRACT);
+ List<ZNRecordDelta> deltaList = new LinkedList<ZNRecordDelta>();
+ deltaList.add(znDelta);
+ IdealState delta = new IdealState(taskQueueIdealState.getResourceName());
+ delta.setDeltaList(deltaList);
+
+ // Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate
+ keyBuilder = accessor.keyBuilder();
+ accessor.updateProperty(keyBuilder.idealStates(taskQueueIdealState.getResourceName()), delta);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 6ca206f..adb81ab 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
@@ -135,23 +136,40 @@ public class MessageGenerationPhase extends AbstractBaseStage
instanceName, currentState, nextState, sessionIdMap.get(instanceName),
stateModelDef.getId(), resource.getStateModelFactoryname(), bucketSize);
IdealState idealState = cache.getIdealState(resourceName);
+ if(idealState!= null && idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
+ {
+ if(idealState.getRecord().getMapField(partition.getPartitionName())!=null)
+ {
+ message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(), idealState.getRecord().getMapField(partition.getPartitionName()));
+ }
+ }
// Set timeout of needed
String stateTransition = currentState + "-" + nextState + "_"
+ Message.Attributes.TIMEOUT;
- if (idealState != null
- && idealState.getRecord().getSimpleField(stateTransition) != null)
+ if (idealState != null)
{
- try
+ String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
+ if(timeOutStr == null && idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
{
- int timeout = Integer.parseInt(idealState.getRecord().getSimpleField(
- stateTransition));
- if (timeout > 0)
+ // scheduled task queue
+ if(idealState.getRecord().getMapField(partition.getPartitionName()) != null)
{
- message.setExecutionTimeout(timeout);
+ timeOutStr = idealState.getRecord().getMapField(partition.getPartitionName()).get(Message.Attributes.TIMEOUT.toString());
}
- } catch (Exception e)
+ }
+ if(timeOutStr !=null)
{
- logger.error("", e);
+ try
+ {
+ int timeout = Integer.parseInt(timeOutStr);
+ if (timeout > 0)
+ {
+ message.setExecutionTimeout(timeout);
+ }
+ } catch (Exception e)
+ {
+ logger.error("", e);
+ }
}
}
message.getRecord().setSimpleField("ClusterEventName", event.getName());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 62a6d76..49101c9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -20,7 +20,12 @@ package org.apache.helix.manager.zk;
*/
import java.io.StringReader;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
@@ -30,6 +35,7 @@ import org.apache.helix.Criteria;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
@@ -37,6 +43,7 @@ import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.StatusUpdate;
import org.apache.helix.model.Message.MessageType;
@@ -46,8 +53,7 @@ import org.codehaus.jackson.map.ObjectMapper;
/*
- * TODO: The current implementation is temporary for backup handler testing only and it does not
- * do any throttling.
+ * The current implementation supports throttling on STATE-TRANSITION type of message, transition SCHEDULED-COMPLETED.
*
*/
public class DefaultSchedulerMessageHandlerFactory implements
@@ -55,6 +61,9 @@ public class DefaultSchedulerMessageHandlerFactory implements
{
public static final String WAIT_ALL = "WAIT_ALL";
public static final String SCHEDULER_MSG_ID = "SchedulerMessageId";
+ public static final String SCHEDULER_TASK_QUEUE = "SchedulerTaskQueue";
+ public static final String CONTROLLER_MSG_ID = "controllerMsgId";
+ public static final int TASKQUEUE_BUCKET_NUM = 10;
public static class SchedulerAsyncCallback extends AsyncCallback
{
StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
@@ -168,6 +177,101 @@ public class DefaultSchedulerMessageHandlerFactory implements
super(message, context);
_manager = manager;
}
+
+ void handleMessageUsingScheduledTaskQueue(Criteria recipientCriteria, Message messageTemplate, String controllerMsgId)
+ {
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ Map<String, String> sendSummary = new HashMap<String, String>();
+ sendSummary.put("MessageCount", "0");
+ Map<InstanceType, List<Message>> messages
+ = _manager.getMessagingService().generateMessage(recipientCriteria, messageTemplate);
+
+ // Calculate tasks, and put them into the idealState of the SCHEDULER_TASK_QUEUE resource.
+ // List field are the destination node, while the Message parameters are stored in the mapFields
+ // task throttling can be done on SCHEDULER_TASK_QUEUE resource
+ if(messages.size() > 0)
+ {
+ String taskQueueName = _message.getRecord().getSimpleField(SCHEDULER_TASK_QUEUE);
+ if(taskQueueName == null)
+ {
+ throw new HelixException("SchedulerTaskMessage need to have " + SCHEDULER_TASK_QUEUE +" specified.");
+ }
+ IdealState newAddedScheduledTasks = new IdealState(taskQueueName);
+ newAddedScheduledTasks.setBucketSize(TASKQUEUE_BUCKET_NUM);
+ newAddedScheduledTasks.setStateModelDefRef(SCHEDULER_TASK_QUEUE);
+
+ synchronized(_manager)
+ {
+ int existingTopPartitionId = 0;
+ IdealState currentTaskQueue = _manager.getHelixDataAccessor()
+ .getProperty(accessor.keyBuilder().idealStates(newAddedScheduledTasks.getId()));
+ if(currentTaskQueue != null)
+ {
+ existingTopPartitionId = findTopPartitionId(currentTaskQueue) + 1;
+ }
+
+ List<Message> taskMessages = (List<Message>)(messages.values().toArray()[0]);
+ for(Message task : taskMessages)
+ {
+ String partitionId = taskQueueName + "_" + existingTopPartitionId;
+ existingTopPartitionId++;
+ String instanceName = task.getTgtName();
+ newAddedScheduledTasks.setPartitionState(partitionId, instanceName, "COMPLETED");
+ task.getRecord().setSimpleField(instanceName, "COMPLETED");
+ task.getRecord().setSimpleField(CONTROLLER_MSG_ID, controllerMsgId);
+
+ List<String> priorityList = new LinkedList<String>();
+ priorityList.add(instanceName);
+ newAddedScheduledTasks.getRecord().setListField(partitionId, priorityList);
+ newAddedScheduledTasks.getRecord().setMapField(partitionId, task.getRecord().getSimpleFields());
+ _logger.info("Scheduling for controllerMsg " + controllerMsgId + " , sending task " + partitionId + " " + task.getMsgId()
+ + " to "+instanceName );
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(task.getRecord().getSimpleFields());
+ }
+ }
+ _manager.getHelixDataAccessor()
+ .updateProperty(accessor.keyBuilder().idealStates(newAddedScheduledTasks.getId()), newAddedScheduledTasks);
+ sendSummary.put("MessageCount", "" + taskMessages.size());
+ }
+ }
+ // Record the number of messages sent into scheduler message status updates
+
+ ZNRecord statusUpdate = accessor.getProperty(
+ keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(),
+ _message.getMsgId())).getRecord();
+
+ statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
+ accessor.updateProperty(keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), _message.getMsgId()),
+ new StatusUpdate(statusUpdate));
+ }
+
+ private int findTopPartitionId(IdealState currentTaskQueue)
+ {
+ int topId = 0;
+ for(String partitionName : currentTaskQueue.getPartitionSet())
+ {
+ try
+ {
+ String partitionNumStr = partitionName.substring(partitionName.lastIndexOf('_') + 1);
+ int num = Integer.parseInt(partitionNumStr);
+ if(topId < num)
+ {
+ topId = num;
+ }
+ }
+ catch(Exception e)
+ {
+ _logger.error("", e);
+ }
+ }
+ return topId;
+ }
@Override
public HelixTaskResult handleMessage() throws InterruptedException
@@ -227,8 +331,21 @@ public class DefaultSchedulerMessageHandlerFactory implements
_logger.warn("",e);
}
}
- // Send all messages.
+ // If the target is PARTICIPANT, use the ScheduledTaskQueue
+ if(InstanceType.PARTICIPANT == recipientCriteria.getRecipientInstanceType())
+ {
+ handleMessageUsingScheduledTaskQueue(recipientCriteria, messageTemplate, _message.getMsgId());
+ result.setSuccess(true);
+ result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId());
+ result.getTaskResultMap().put(
+ "ControllerResult",
+ "msg " + _message.getMsgId() + " from " + _message.getMsgSrc()
+ + " processed");
+ return result;
+ }
+
+ _logger.info("Scheduler sending message to Controller");
int nMsgsSent = 0;
SchedulerAsyncCallback callback = new SchedulerAsyncCallback(_message, _manager);
if(waitAll)
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 3d13cd0..dc0f5b6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -72,6 +72,7 @@ import org.apache.helix.monitoring.ZKPathDataDumpTask;
import org.apache.helix.participant.DistClusterControllerElection;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
import org.apache.helix.store.ZNRecordJsonSerializer;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.tools.PropertiesReader;
@@ -792,6 +793,9 @@ public class ZKHelixManager implements HelixManager
_stateMachEngine);
addMessageListener(_messagingService.getExecutor(), _instanceName);
addControllerListener(_helixAccessor);
+
+ ScheduledTaskStateModelFactory stStateModelFactory = new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
+ _stateMachEngine.registerStateModelFactory(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
if (_participantHealthCheckInfoCollector == null)
{
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index b6812a4..558c91c 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -160,7 +160,7 @@ public class DefaultMessagingService implements ClusterMessagingService
return totalMessageCount;
}
- private Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
+ public Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
final Message message)
{
Map<InstanceType, List<Message>> messagesToSendMap =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index c674d69..4220d22 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -50,12 +50,10 @@ import org.apache.helix.MessageListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.Attributes;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.monitoring.ParticipantMonitor;
@@ -624,7 +622,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
}
}
- private MessageHandler createMessageHandler(Message message,
+ public MessageHandler createMessageHandler(Message message,
NotificationContext changeContext)
{
String msgType = message.getMsgType().toString();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index e0ae9aa..9d07404 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -79,7 +79,8 @@ public class Message extends HelixProperty
RETRY_COUNT,
STATE_MODEL_FACTORY_NAME,
BUCKET_SIZE,
- PARENT_MSG_ID // used for group message mode
+ PARENT_MSG_ID, // used for group message mode
+ INNER_MESSAGE
}
public enum MessageState
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
new file mode 100644
index 0000000..e4df933
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
@@ -0,0 +1,127 @@
+package org.apache.helix.participant.statemachine;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+
+public class ScheduledTaskStateModel extends StateModel
+{
+ static final String DEFAULT_INITIAL_STATE = "OFFLINE";
+ Logger logger = Logger.getLogger(ScheduledTaskStateModel.class);
+
+ // TODO Get default state from implementation or from state model annotation
+ // StateModel with initial state other than OFFLINE should override this field
+ protected String _currentState = DEFAULT_INITIAL_STATE;
+ final ScheduledTaskStateModelFactory _factory;
+ final String _partitionName;
+
+ final HelixTaskExecutor _executor;
+
+ public ScheduledTaskStateModel(ScheduledTaskStateModelFactory factory, HelixTaskExecutor executor, String partitionName)
+ {
+ _factory = factory;
+ _partitionName = partitionName;
+ _executor = executor;
+ }
+
+ @Transition(to="COMPLETED",from="OFFLINE")
+ public void onBecomeCompletedFromOffline(Message message,
+ NotificationContext context) throws InterruptedException
+ {
+ logger.info(_partitionName + " onBecomeCompletedFromOffline");
+
+ // Construct the inner task message from the mapfields of scheduledTaskQueue resource group
+ Map<String, String> messageInfo = message.getRecord().getMapField(Message.Attributes.INNER_MESSAGE.toString());
+ ZNRecord record = new ZNRecord(_partitionName);
+ record.getSimpleFields().putAll(messageInfo);
+ Message taskMessage = new Message(record);
+ if(logger.isDebugEnabled())
+ {
+ logger.debug(taskMessage.getRecord().getSimpleFields());
+ }
+ MessageHandler handler = _executor.createMessageHandler(taskMessage, new NotificationContext(null));
+ if (handler == null)
+ {
+ throw new HelixException("Task message " + taskMessage.getMsgType() + " handler not found, task id " + _partitionName);
+ }
+ // Invoke the internal handler to complete the task
+ handler.handleMessage();
+ logger.info(_partitionName + " onBecomeCompletedFromOffline completed");
+ }
+
+ @Transition(to="OFFLINE",from="COMPLETED")
+ public void onBecomeOfflineFromCompleted(Message message,
+ NotificationContext context)
+ {
+ logger.info(_partitionName + " onBecomeOfflineFromCompleted");
+ }
+
+ @Transition(to="DROPPED",from="COMPLETED")
+ public void onBecomeDroppedFromCompleted(Message message,
+ NotificationContext context)
+ {
+ logger.info(_partitionName + " onBecomeDroppedFromCompleted");
+ removeFromStatemodelFactory();
+ }
+
+
+ @Transition(to="DROPPED",from="OFFLINE")
+ public void onBecomeDroppedFromOffline(Message message,
+ NotificationContext context) throws InterruptedException
+ {
+ logger.info(_partitionName + " onBecomeDroppedFromScheduled");
+ removeFromStatemodelFactory();
+ }
+
+ @Transition(to="OFFLINE",from="ERROR")
+ public void onBecomeOfflineFromError(Message message,
+ NotificationContext context) throws InterruptedException
+ {
+ logger.info(_partitionName + " onBecomeOfflineFromError");
+ }
+
+ public void reset()
+ {
+ logger.info(_partitionName + " ScheduledTask reset");
+ removeFromStatemodelFactory();
+ }
+
+ // We need this to prevent state model leak
+ private void removeFromStatemodelFactory()
+ {
+ if(_factory.getStateModelMap().containsKey(_partitionName))
+ {
+ _factory.getStateModelMap().remove(_partitionName);
+ }
+ else
+ {
+ logger.warn(_partitionName + " not found in ScheduledTaskStateModelFactory");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
new file mode 100644
index 0000000..6f6d1ed
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModelFactory.java
@@ -0,0 +1,43 @@
+package org.apache.helix.participant.statemachine;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.log4j.Logger;
+
+
+public class ScheduledTaskStateModelFactory extends StateModelFactory<ScheduledTaskStateModel>
+{
+ Logger logger = Logger.getLogger(ScheduledTaskStateModelFactory.class);
+
+ HelixTaskExecutor _executor;
+
+ public ScheduledTaskStateModelFactory(HelixTaskExecutor executor)
+ {
+ _executor = executor;
+ }
+
+ @Override
+ public ScheduledTaskStateModel createNewStateModel(String partitionName)
+ {
+ logger.info("Create state model for ScheduledTask " + partitionName);
+ return new ScheduledTaskStateModel(this, _executor, partitionName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 59cb76b..bde82f6 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -162,6 +162,9 @@ public class ClusterSetup
addStateModelDef(clusterName,
"OnlineOffline",
new StateModelDefinition(generator.generateConfigForOnlineOffline()));
+ addStateModelDef(clusterName,
+ "ScheduledTask",
+ new StateModelDefinition(generator.generateConfigForScheduledTaskQueue()));
}
public void activateCluster(String clusterName, String grandCluster, boolean enable)
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
index 53278e2..dada08a 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java
@@ -25,8 +25,11 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.Transition;
import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
+import org.apache.helix.participant.statemachine.StateTransitionTableBuilder;
public class StateModelConfigGenerator
@@ -348,4 +351,64 @@ public class StateModelConfigGenerator
// ZNRecordSerializer serializer = new ZNRecordSerializer();
// System.out.println(new String(serializer.serialize(record)));
}
+
+ public ZNRecord generateConfigForScheduledTaskQueue()
+ {
+ ZNRecord record = new ZNRecord(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE);
+ record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
+ "OFFLINE");
+ List<String> statePriorityList = new ArrayList<String>();
+ statePriorityList.add("COMPLETED");
+ statePriorityList.add("OFFLINE");
+ statePriorityList.add("DROPPED");
+ record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+ statePriorityList);
+ for (String state : statePriorityList)
+ {
+ String key = state + ".meta";
+ Map<String, String> metadata = new HashMap<String, String>();
+ if (state.equals("COMPLETED"))
+ {
+ metadata.put("count", "1");
+ record.setMapField(key, metadata);
+ }
+ if (state.equals("OFFLINE"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ if (state.equals("DROPPED"))
+ {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ }
+
+ List<String> states = new ArrayList<String>();
+ states.add("COMPLETED");
+ states.add("DROPPED");
+ states.add("OFFLINE");
+
+ List<Transition> transitions = new ArrayList<Transition>();
+ transitions.add(new Transition("OFFLINE", "COMPLETED"));
+ transitions.add(new Transition("OFFLINE", "DROPPED"));
+ transitions.add(new Transition("COMPLETED", "DROPPED"));
+
+ StateTransitionTableBuilder builder = new StateTransitionTableBuilder();
+ Map<String, Map<String, String>> next = builder.buildTransitionTable(states, transitions);
+
+ for (String state : statePriorityList)
+ {
+ String key = state + ".next";
+ record.setMapField(key, next.get(state));
+ }
+ List<String> stateTransitionPriorityList = new ArrayList<String>();
+ stateTransitionPriorityList.add("OFFLINE-COMPLETED");
+ stateTransitionPriorityList.add("OFFLINE-DROPPED");
+ stateTransitionPriorityList.add("COMPLETED-DROPPED");
+
+ record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+ stateTransitionPriorityList);
+ return record;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index b0aa4ac..d71556a 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -791,5 +791,13 @@ public class Mocks {
return 0;
}
+ @Override
+ public Map<InstanceType, List<Message>> generateMessage(
+ Criteria recipientCriteria, Message messageTemplate)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/5e8a912b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 98421c3..5ee4f85 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -19,13 +19,17 @@ package org.apache.helix.integration;
* under the License.
*/
+import java.io.IOException;
import java.io.StringWriter;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
import org.apache.helix.Criteria;
import org.apache.helix.HelixDataAccessor;
@@ -46,6 +50,8 @@ import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
import org.apache.helix.util.HelixUtil;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.testng.Assert;
@@ -54,6 +60,27 @@ import org.testng.annotations.Test;
public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck
{
+
+ class MockAsyncCallback extends AsyncCallback
+ {
+ Message _message;
+ public MockAsyncCallback()
+ {
+ }
+
+ @Override
+ public void onTimeOut()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void onReplyMessage(Message message)
+ {
+ _message = message;
+ }
+ }
TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
public static class TestMessagingHandlerFactory implements
MessageHandlerFactory
@@ -101,8 +128,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
new ConcurrentSkipListSet<String>());
}
}
- _results.get(_message.getPartitionName()).add(destName);
-
+ _results.get(_message.getPartitionName()).add(_message.getMsgId());
+ //System.err.println("Message " + _message.getMsgId() + " executed");
return result;
}
@@ -110,10 +137,77 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
public void onError(Exception e, ErrorCode code, ErrorType type)
{
// TODO Auto-generated method stub
+ }
+ }
+ }
+
+ public static class TestMessagingHandlerFactoryLatch implements
+ MessageHandlerFactory
+ {
+ public volatile CountDownLatch _latch = new CountDownLatch(1);
+ public int _messageCount = 0;
+ public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
+ @Override
+ public synchronized MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+ _messageCount++;
+ return new TestMessagingHandlerLatch(message, context);
+ }
+
+ public synchronized void signal()
+ {
+ _latch.countDown();
+ _latch = new CountDownLatch(1);
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ return "TestMessagingHandlerLatch";
+ }
+
+ @Override
+ public void reset()
+ {
+ // TODO Auto-generated method stub
+ }
+ public class TestMessagingHandlerLatch extends MessageHandler
+ {
+ public TestMessagingHandlerLatch(Message message, NotificationContext context)
+ {
+ super(message, context);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException
+ {
+ _latch.await();
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ String destName = _message.getTgtName();
+ synchronized (_results)
+ {
+ if (!_results.containsKey(_message.getPartitionName()))
+ {
+ _results.put(_message.getPartitionName(),
+ new ConcurrentSkipListSet<String>());
+ }
}
+ _results.get(_message.getPartitionName()).add(destName);
+ //System.err.println("Message " + _message.getMsgId() + " executed");
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type)
+ {
+ // TODO Auto-generated method stub
}
}
+}
@Test()
public void TestSchedulerMsg() throws Exception
@@ -134,7 +228,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
schedulerMessage.setTgtName("CONTROLLER");
// TODO: change it to "ADMIN" ?
schedulerMessage.setSrcName("CONTROLLER");
-
+ schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
// Template for the individual message sent to each participant
Message msg = new Message(_factory.getMessageType(), "Template");
msg.setTgtSessionId("*");
@@ -167,8 +261,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
helixDataAccessor.createProperty(
keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
schedulerMessage);
-
- Thread.sleep(15000);
for(int i = 0; i < 30; i++)
{
@@ -323,35 +415,27 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
cr2.setInstanceName("*");
cr2.setSessionSpecific(false);
-
- class MockAsyncCallback extends AsyncCallback
- {
- Message _message;
- public MockAsyncCallback()
- {
- }
- @Override
- public void onTimeOut()
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void onReplyMessage(Message message)
- {
- _message = message;
- }
-
- }
+ schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg2");
MockAsyncCallback callback = new MockAsyncCallback();
manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
-
+ for(int i = 0;i < 10; i++ )
+ {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ if(statusUpdate.getMapFields().containsKey("Summary"))
+ {
+ break;
+ }
+ }
+
Assert.assertEquals(_PARTITIONS, _factory._results.size());
PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
MessageType.SCHEDULER_MSG.toString(), msgId);
@@ -446,4 +530,447 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
Assert.assertEquals(count, 0);
}
+
+
+ @Test()
+ public void TestSchedulerMsg3() throws Exception
+ {
+ _factory._results.clear();
+ HelixManager manager = null;
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String hostDest = "localhost_" + (START_PORT + i);
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+ //
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+ manager = _startCMResultMap.get(hostDest)._manager;
+ }
+
+ Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+ .randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
+ schedulerMessage.setTgtName("CONTROLLER");
+ // TODO: change it to "ADMIN" ?
+ schedulerMessage.setSrcName("CONTROLLER");
+
+ // Template for the individual message sent to each participant
+ Message msg = new Message(_factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+
+ // Criteria to send individual messages
+ Criteria cr = new Criteria();
+ cr.setInstanceName("localhost_%");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setResource("%");
+ cr.setPartition("%");
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ String crString = sw.toString();
+
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ schedulerMessage.getRecord().setMapField("MessageTemplate",
+ msg.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+ schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+
+ schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg3");
+ Criteria cr2 = new Criteria();
+ cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+ cr2.setInstanceName("*");
+ cr2.setSessionSpecific(false);
+
+ MockAsyncCallback callback = new MockAsyncCallback();
+ cr.setInstanceName("localhost_%");
+ mapper = new ObjectMapper();
+ serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ crString = sw.toString();
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+
+ for(int i = 0; i < 4; i++)
+ {
+ callback = new MockAsyncCallback();
+ cr.setInstanceName("localhost_"+(START_PORT + i));
+ mapper = new ObjectMapper();
+ serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+ schedulerMessage.setMsgId(UUID.randomUUID().toString());
+ crString = sw.toString();
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+ String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+
+ for(int j = 0;j < 100; j++ )
+ {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ if(statusUpdate.getMapFields().containsKey("Summary"))
+ {
+ break;
+ }
+ }
+
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+ .get("MessageCount").equals("" + (_PARTITIONS * 3 / 5)));
+ int messageResultCount = 0;
+ for(String key : statusUpdate.getMapFields().keySet())
+ {
+ if(key.startsWith("MessageResult"))
+ {
+ messageResultCount ++;
+ }
+ }
+ Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
+
+ int count = 0;
+ //System.out.println(i);
+ for (Set<String> val : _factory._results.values())
+ {
+ //System.out.println(val);
+ count += val.size();
+ }
+ //System.out.println(count);
+ Assert.assertEquals(count, _PARTITIONS * 3/ 5 *(i+1) );
+ }
+ }
+
+
+ @Test()
+ public void TestSchedulerMsg4() throws Exception
+ {
+ _factory._results.clear();
+ HelixManager manager = null;
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String hostDest = "localhost_" + (START_PORT + i);
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+ //
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+ manager = _startCMResultMap.get(hostDest)._manager;
+ }
+
+ Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+ .randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
+ schedulerMessage.setTgtName("CONTROLLER");
+ // TODO: change it to "ADMIN" ?
+ schedulerMessage.setSrcName("CONTROLLER");
+
+ // Template for the individual message sent to each participant
+ Message msg = new Message(_factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+
+ // Criteria to send individual messages
+ Criteria cr = new Criteria();
+ cr.setInstanceName("localhost_%");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setResource("TestDB");
+ cr.setPartition("%");
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ String crString = sw.toString();
+
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ schedulerMessage.getRecord().setMapField("MessageTemplate",
+ msg.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+ schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+
+ schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg4");
+ Criteria cr2 = new Criteria();
+ cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+ cr2.setInstanceName("*");
+ cr2.setSessionSpecific(false);
+
+ Map<String, String> constraints = new TreeMap<String, String>();
+ constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
+ constraints.put("TRANSITION", "OFFLINE-COMPLETED");
+ constraints.put("CONSTRAINT_VALUE", "1");
+ constraints.put("INSTANCE", ".*");
+ manager.getClusterManagmentTool().addMessageConstraint(manager.getClusterName(), "constraint1", constraints);
+
+ MockAsyncCallback callback = new MockAsyncCallback();
+ cr.setInstanceName("localhost_%");
+ mapper = new ObjectMapper();
+ serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ crString = sw.toString();
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+ String msgIdPrime = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ ArrayList<String> msgIds = new ArrayList<String>();
+ for(int i = 0; i < NODE_NR; i++)
+ {
+ callback = new MockAsyncCallback();
+ cr.setInstanceName("localhost_"+(START_PORT + i));
+ mapper = new ObjectMapper();
+ serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+ schedulerMessage.setMsgId(UUID.randomUUID().toString());
+ crString = sw.toString();
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+ String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+ msgIds.add(msgId);
+ }
+ for(int i = 0; i < NODE_NR; i++)
+ {
+ String msgId = msgIds.get(i);
+ for(int j = 0;j < 100; j++ )
+ {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ if(statusUpdate.getMapFields().containsKey("Summary"))
+ {
+ //System.err.println(msgId+" done");
+ break;
+ }
+ }
+
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+ .get("MessageCount").equals("" + (_PARTITIONS * 3 / 5)));
+ int messageResultCount = 0;
+ for(String key : statusUpdate.getMapFields().keySet())
+ {
+ if(key.startsWith("MessageResult"))
+ {
+ messageResultCount ++;
+ }
+ }
+ if(messageResultCount != _PARTITIONS * 3 / 5)
+ {
+ int x = 10;
+ x = x + messageResultCount;
+ }
+ Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
+ }
+
+ for(int j = 0;j < 100; j++ )
+ {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), msgIdPrime);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ if(statusUpdate.getMapFields().containsKey("Summary"))
+ {
+ break;
+ }
+ }
+ int count = 0;
+ for (Set<String> val : _factory._results.values())
+ {
+ //System.out.println(val);
+ count += val.size();
+ }
+ //System.out.println(count);
+ Assert.assertEquals(count, _PARTITIONS * 3 * 2 );
+ }
+
+
+ @Test
+ public void TestSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException, IOException, InterruptedException
+ {
+ TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
+ HelixManager manager = null;
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String hostDest = "localhost_" + (START_PORT + i);
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(factory.getMessageType(), factory);
+ //
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(factory.getMessageType(), factory);
+ manager = _startCMResultMap.get(hostDest)._manager;
+ }
+
+ Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+ .randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
+ schedulerMessage.setTgtName("CONTROLLER");
+ // TODO: change it to "ADMIN" ?
+ schedulerMessage.setSrcName("CONTROLLER");
+
+ // Template for the individual message sent to each participant
+ Message msg = new Message(factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+
+ // Criteria to send individual messages
+ Criteria cr = new Criteria();
+ cr.setInstanceName("localhost_%");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setResource("%");
+ cr.setPartition("%");
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ String crString = sw.toString();
+
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ schedulerMessage.getRecord().setMapField("MessageTemplate",
+ msg.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+ schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
+ schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
+
+ Criteria cr2 = new Criteria();
+ cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
+ cr2.setInstanceName("*");
+ cr2.setSessionSpecific(false);
+
+
+ MockAsyncCallback callback = new MockAsyncCallback();
+ mapper = new ObjectMapper();
+ serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+
+ // Set contraints that only 1 msg per participant
+ Map<String, String> constraints = new TreeMap<String, String>();
+ constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
+ constraints.put("TRANSITION", "OFFLINE-COMPLETED");
+ constraints.put("CONSTRAINT_VALUE", "1");
+ constraints.put("INSTANCE", ".*");
+ manager.getClusterManagmentTool().addMessageConstraint(manager.getClusterName(), "constraint1", constraints);
+
+ // Send scheduler message
+ crString = sw.toString();
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
+ String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
+
+ for(int j = 0;j < 10; j++ )
+ {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ if(statusUpdate.getMapFields().containsKey("SentMessageCount"))
+ {
+ Assert.assertEquals(statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""+(_PARTITIONS * 3));
+ break;
+ }
+ }
+
+ for(int i = 0; i < _PARTITIONS * 3 / 5; i++)
+ {
+ for(int j = 0; i< 10; j++)
+ {
+ Thread.sleep(300);
+ if(factory._messageCount == 5*(i+1)) break;
+ }
+ Thread.sleep(300);
+ Assert.assertEquals(factory._messageCount, 5*(i+1));
+ factory.signal();
+ //System.err.println(i);
+ }
+
+ for(int j = 0;j < 10; j++ )
+ {
+ Thread.sleep(200);
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ if(statusUpdate.getMapFields().containsKey("Summary"))
+ {
+ break;
+ }
+ }
+
+ Assert.assertEquals(_PARTITIONS, factory._results.size());
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), msgId);
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+ .get("MessageCount").equals("" + (_PARTITIONS * 3)));
+ int messageResultCount = 0;
+ for(String key : statusUpdate.getMapFields().keySet())
+ {
+ if(key.startsWith("MessageResult "))
+ {
+ messageResultCount ++;
+ }
+ }
+ Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+
+ int count = 0;
+ for (Set<String> val : factory._results.values())
+ {
+ count += val.size();
+ }
+ Assert.assertEquals(count, _PARTITIONS * 3);
+
+ manager.getClusterManagmentTool().addMessageConstraint(manager.getClusterName(), "constraint1", new TreeMap<String, String>());
+
+ }
}