You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by sl...@apache.org on 2013/05/17 03:13:35 UTC

git commit: [HELIX-97]Scheduler message summary does not contain individual message results

Updated Branches:
  refs/heads/master 0e9ea9d49 -> 96cc67417


[HELIX-97]Scheduler message summary does not contain individual message
results



Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/96cc6741
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/96cc6741
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/96cc6741

Branch: refs/heads/master
Commit: 96cc674174ae7e401f54bef43c11c0bc0f02986b
Parents: 0e9ea9d
Author: slu2011 <lu...@gmail.com>
Authored: Thu May 16 18:13:20 2013 -0700
Committer: slu2011 <lu...@gmail.com>
Committed: Thu May 16 18:13:20 2013 -0700

----------------------------------------------------------------------
 .../webapp/resources/SchedulerTasksResource.java   |    5 +-
 .../zk/DefaultSchedulerMessageHandlerFactory.java  |    4 +-
 .../helix/integration/TestSchedulerMessage.java    |  114 ++++++++++++++-
 3 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96cc6741/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
index 097771e..3791a09 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/SchedulerTasksResource.java
@@ -170,11 +170,10 @@ public class SchedulerTasksResource extends Resource
       schedulerMessage.setTgtName("CONTROLLER");
       schedulerMessage.setSrcInstanceType(InstanceType.CONTROLLER);
       String taskQueueName =  ClusterRepresentationUtil.getFormJsonParameterString(form, TASKQUEUENAME);
-      if(taskQueueName != null)
+      if(taskQueueName != null && taskQueueName.length() > 0)
       {
-        throw new HelixException("SchedulerTasksResource need to have " + TASKQUEUENAME +" specified.");
+        schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, taskQueueName);
       }
-      schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, taskQueueName);
       accessor.setProperty(accessor.keyBuilder().controllerMessage(schedulerMessage.getMsgId()), schedulerMessage);
       
       Map<String, String> resultMap = new HashMap<String, String>();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96cc6741/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 49101c9..4fe8750 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -331,9 +331,9 @@ public class DefaultSchedulerMessageHandlerFactory implements
           _logger.warn("",e);
         }
       }
-      
+      boolean hasSchedulerTaskQueue = _message.getRecord().getSimpleFields().containsKey(SCHEDULER_TASK_QUEUE);
       // If the target is PARTICIPANT, use the ScheduledTaskQueue
-      if(InstanceType.PARTICIPANT == recipientCriteria.getRecipientInstanceType())
+      if(InstanceType.PARTICIPANT == recipientCriteria.getRecipientInstanceType() && hasSchedulerTaskQueue)
       {
         handleMessageUsingScheduledTaskQueue(recipientCriteria, messageTemplate, _message.getMsgId());
         result.setSuccess(true);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96cc6741/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index cc4f9ea..c867132 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -53,6 +53,8 @@ import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.monitoring.ZKPathDataDumpTask;
 import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -123,6 +125,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
         HelixTaskResult result = new HelixTaskResult();
         result.setSuccess(true);
         String destName = _message.getTgtName();
+        result.getTaskResultMap().put("Message", _message.getMsgId());
         synchronized (_results)
         {
           if (!_results.containsKey(_message.getPartitionName()))
@@ -190,6 +193,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
       _latch.await();
       HelixTaskResult result = new HelixTaskResult();
       result.setSuccess(true);
+      result.getTaskResultMap().put("Message", _message.getMsgId());
       String destName = _message.getTgtName();
       synchronized (_results)
       {
@@ -211,10 +215,112 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     }
   }
 }
+  
+  @Test()
+  public void TestSchedulerMsgUsingQueue() throws Exception
+  {
+    Logger.getRootLogger().setLevel(Level.INFO);
+    _factory._results.clear();
+    HelixManager manager = null;
+    for (int i = 0; i < NODE_NR; i++)
+    {
+      String hostDest = "localhost_" + (START_PORT + i);
+      _startCMResultMap.get(hostDest)._manager.getMessagingService()
+          .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
+      manager = _startCMResultMap.get(hostDest)._manager;
+    }
+
+    Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
+        .randomUUID().toString());
+    schedulerMessage.setTgtSessionId("*");
+    schedulerMessage.setTgtName("CONTROLLER");
+    // TODO: change it to "ADMIN" ?
+    schedulerMessage.setSrcName("CONTROLLER");
+    schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
+    // Template for the individual message sent to each participant
+    Message msg = new Message(_factory.getMessageType(), "Template");
+    msg.setTgtSessionId("*");
+    msg.setMsgState(MessageState.NEW);
 
+    // Criteria to send individual messages
+    Criteria cr = new Criteria();
+    cr.setInstanceName("localhost_%");
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(false);
+    cr.setResource("%");
+    cr.setPartition("%");
+
+    ObjectMapper mapper = new ObjectMapper();
+    SerializationConfig serializationConfig = mapper.getSerializationConfig();
+    serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
+
+    StringWriter sw = new StringWriter();
+    mapper.writeValue(sw, cr);
+
+    String crString = sw.toString();
+
+    schedulerMessage.getRecord().setSimpleField("Criteria", crString);
+    schedulerMessage.getRecord().setMapField("MessageTemplate",
+        msg.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
+
+    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = helixDataAccessor.keyBuilder();
+    helixDataAccessor.createProperty(
+        keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
+        schedulerMessage);
+    
+    for(int i = 0; i < 30; i++)
+    {
+      Thread.sleep(2000);
+      if(_PARTITIONS == _factory._results.size())
+      {
+        break;
+      }
+    }
+
+    Assert.assertEquals(_PARTITIONS, _factory._results.size());
+    PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
+        MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
+      
+    int messageResultCount = 0;
+    for(int i = 0; i < 10; i++)
+    {
+      ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
+          .getRecord();
+      Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
+          .get("MessageCount").equals("" + (_PARTITIONS * 3)));
+      for(String key : statusUpdate.getMapFields().keySet())
+      {
+        if(key.startsWith("MessageResult "))
+        {
+          messageResultCount ++;
+        }
+      }
+      if(messageResultCount == _PARTITIONS * 3)
+      {
+        break;
+      }
+      else
+      {
+        Thread.sleep(2000);
+      }
+    }
+    Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
+    int count = 0;
+    for (Set<String> val : _factory._results.values())
+    {
+      count += val.size();
+    }
+    Assert.assertEquals(count, _PARTITIONS * 3);
+    
+   
+  }
+  
   @Test()
   public void TestSchedulerMsg() throws Exception
   {
+    Logger.getRootLogger().setLevel(Level.INFO);
     _factory._results.clear();
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++)
@@ -231,7 +337,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     schedulerMessage.setTgtName("CONTROLLER");
     // TODO: change it to "ADMIN" ?
     schedulerMessage.setSrcName("CONTROLLER");
-    schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
+    //schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
     // Template for the individual message sent to each participant
     Message msg = new Message(_factory.getMessageType(), "Template");
     msg.setTgtSessionId("*");
@@ -290,6 +396,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
         if(key.startsWith("MessageResult "))
         {
           messageResultCount ++;
+          Assert.assertTrue(statusUpdate.getMapField(key).size() > 1);
         }
       }
       if(messageResultCount == _PARTITIONS * 3)
@@ -987,10 +1094,9 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     }
     Assert.assertEquals(count, _PARTITIONS * 3);
     
-    manager.getClusterManagmentTool().setConstraint(manager.getClusterName(), 
+    manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(), 
                                                     ConstraintType.MESSAGE_CONSTRAINT,
-                                                    "constraint1", 
-                                                    new ConstraintItem(new TreeMap<String, String>()));
+                                                    "constraint1");
     
   }
 }