You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by sl...@apache.org on 2013/05/17 03:13:35 UTC
git commit: [HELIX-97]Scheduler message summary does not contain
individual message results
Updated Branches:
refs/heads/master 0e9ea9d49 -> 96cc67417
[HELIX-97]Scheduler message summary does not contain individual message
results
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/96cc6741
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/96cc6741
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/96cc6741
Branch: refs/heads/master
Commit: 96cc674174ae7e401f54bef43c11c0bc0f02986b
Parents: 0e9ea9d
Author: slu2011 <lu...@gmail.com>
Authored: Thu May 16 18:13:20 2013 -0700
Committer: slu2011 <lu...@gmail.com>
Committed: Thu May 16 18:13:20 2013 -0700
----------------------------------------------------------------------
.../webapp/resources/SchedulerTasksResource.java | 5 +-
.../zk/DefaultSchedulerMessageHandlerFactory.java | 4 +-
.../helix/integration/TestSchedulerMessage.java | 114 ++++++++++++++-
3 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96cc6741/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
index 097771e..3791a09 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
@@ -170,11 +170,10 @@ public class SchedulerTasksResource extends Resource
schedulerMessage.setTgtName("CONTROLLER");
schedulerMessage.setSrcInstanceType(InstanceType.CONTROLLER);
String taskQueueName = ClusterRepresentationUtil.getFormJsonParameterString(form, TASKQUEUENAME);
- if(taskQueueName != null)
+ if(taskQueueName != null && taskQueueName.length() > 0)
{
- throw new HelixException("SchedulerTasksResource need to have " + TASKQUEUENAME +" specified.");
+ schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, taskQueueName);
}
- schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, taskQueueName);
accessor.setProperty(accessor.keyBuilder().controllerMessage(schedulerMessage.getMsgId()), schedulerMessage);
Map<String, String> resultMap = new HashMap<String, String>();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96cc6741/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 49101c9..4fe8750 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -331,9 +331,9 @@ public class DefaultSchedulerMessageHandlerFactory implements
_logger.warn("",e);
}
}
-
+ boolean hasSchedulerTaskQueue = _message.getRecord().getSimpleFields().containsKey(SCHEDULER_TASK_QUEUE);
// If the target is PARTICIPANT, use the ScheduledTaskQueue
- if(InstanceType.PARTICIPANT == recipientCriteria.getRecipientInstanceType())
+ if(InstanceType.PARTICIPANT == recipientCriteria.getRecipientInstanceType() && hasSchedulerTaskQueue)
{
handleMessageUsingScheduledTaskQueue(recipientCriteria, messageTemplate, _message.getMsgId());
result.setSuccess(true);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96cc6741/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index cc4f9ea..c867132 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -53,6 +53,8 @@ import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StatusUpdate;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
@@ -123,6 +125,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
String destName = _message.getTgtName();
+ result.getTaskResultMap().put("Message", _message.getMsgId());
synchronized (_results)
{
if (!_results.containsKey(_message.getPartitionName()))
@@ -190,6 +193,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
_latch.await();
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
+ result.getTaskResultMap().put("Message", _message.getMsgId());
String destName = _message.getTgtName();
synchronized (_results)
{
@@ -211,10 +215,112 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
}
}
+
+ @Test()
+ public void TestSchedulerMsgUsingQueue() throws Exception
+ {
+ Logger.getRootLogger().setLevel(Level.INFO);
+ _factory._results.clear();
+ HelixManager manager = null;
+ for (int i = 0; i < NODE_NR; i++)
+ {
+ String hostDest = "localhost_" + (START_PORT + i);
+ _startCMResultMap.get(hostDest)._manager.getMessagingService()
+ .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+ manager = _startCMResultMap.get(hostDest)._manager;
+ }
+
+ Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+ .randomUUID().toString());
+ schedulerMessage.setTgtSessionId("*");
+ schedulerMessage.setTgtName("CONTROLLER");
+ // TODO: change it to "ADMIN" ?
+ schedulerMessage.setSrcName("CONTROLLER");
+ schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
+ // Template for the individual message sent to each participant
+ Message msg = new Message(_factory.getMessageType(), "Template");
+ msg.setTgtSessionId("*");
+ msg.setMsgState(MessageState.NEW);
+ // Criteria to send individual messages
+ Criteria cr = new Criteria();
+ cr.setInstanceName("localhost_%");
+ cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ cr.setSessionSpecific(false);
+ cr.setResource("%");
+ cr.setPartition("%");
+
+ ObjectMapper mapper = new ObjectMapper();
+ SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+ StringWriter sw = new StringWriter();
+ mapper.writeValue(sw, cr);
+
+ String crString = sw.toString();
+
+ schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+ schedulerMessage.getRecord().setMapField("MessageTemplate",
+ msg.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ helixDataAccessor.createProperty(
+ keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
+ schedulerMessage);
+
+ for(int i = 0; i < 30; i++)
+ {
+ Thread.sleep(2000);
+ if(_PARTITIONS == _factory._results.size())
+ {
+ break;
+ }
+ }
+
+ Assert.assertEquals(_PARTITIONS, _factory._results.size());
+ PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+ MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
+
+ int messageResultCount = 0;
+ for(int i = 0; i < 10; i++)
+ {
+ ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+ .getRecord();
+ Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+ .get("MessageCount").equals("" + (_PARTITIONS * 3)));
+ for(String key : statusUpdate.getMapFields().keySet())
+ {
+ if(key.startsWith("MessageResult "))
+ {
+ messageResultCount ++;
+ }
+ }
+ if(messageResultCount == _PARTITIONS * 3)
+ {
+ break;
+ }
+ else
+ {
+ Thread.sleep(2000);
+ }
+ }
+ Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+ int count = 0;
+ for (Set<String> val : _factory._results.values())
+ {
+ count += val.size();
+ }
+ Assert.assertEquals(count, _PARTITIONS * 3);
+
+
+ }
+
@Test()
public void TestSchedulerMsg() throws Exception
{
+ Logger.getRootLogger().setLevel(Level.INFO);
_factory._results.clear();
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++)
@@ -231,7 +337,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
schedulerMessage.setTgtName("CONTROLLER");
// TODO: change it to "ADMIN" ?
schedulerMessage.setSrcName("CONTROLLER");
- schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
+ //schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
// Template for the individual message sent to each participant
Message msg = new Message(_factory.getMessageType(), "Template");
msg.setTgtSessionId("*");
@@ -290,6 +396,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
if(key.startsWith("MessageResult "))
{
messageResultCount ++;
+ Assert.assertTrue(statusUpdate.getMapField(key).size() > 1);
}
}
if(messageResultCount == _PARTITIONS * 3)
@@ -987,10 +1094,9 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
Assert.assertEquals(count, _PARTITIONS * 3);
- manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
+ manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(),
ConstraintType.MESSAGE_CONSTRAINT,
- "constraint1",
- new ConstraintItem(new TreeMap<String, String>()));
+ "constraint1");
}
}