You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC

[16/42] Refactoring the package names and removing jsql parser

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/package-info.java b/helix-core/src/main/java/org/apache/helix/manager/zk/package-info.java
new file mode 100644
index 0000000..08516e8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * zookeeper-based implementation of Helix cluster manager
+ * 
+ */
+package org.apache.helix.manager.zk;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java b/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java
new file mode 100644
index 0000000..5a520a9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+
+public abstract class AsyncCallback
+{
+
+  private static Logger _logger = Logger.getLogger(AsyncCallback.class);
+  long _startTimeStamp = 0;
+  protected long _timeout = -1;
+  Timer _timer = null;
+  List<Message> _messagesSent;
+  protected final List<Message> _messageReplied = new ArrayList<Message>();
+  boolean _timedOut = false;
+  boolean _isInterrupted = false;
+
+  /**
+   * Enforcing timeout to be set
+   * 
+   * @param timeout
+   */
+  public AsyncCallback(long timeout)
+  {
+    _logger.info("Setting time out to " + timeout + " ms");
+    _timeout = timeout;
+  }
+  
+  public AsyncCallback()
+  {
+    this(-1);
+  }
+
+  public final void setTimeout(long timeout)
+  {
+    _logger.info("Setting time out to " + timeout + " ms");
+    _timeout = timeout;
+
+  }
+
+  public List<Message> getMessageReplied()
+  {
+    return _messageReplied;
+  }
+
+  public boolean isInterrupted()
+  {
+    return _isInterrupted;
+  }
+
+  public void setInterrupted(boolean b)
+  {
+    _isInterrupted = true;
+  }
+
+  public synchronized final void onReply(Message message)
+  {
+    _logger.info("OnReply msg " + message.getMsgId());
+    if (!isDone())
+    {
+      _messageReplied.add(message);
+      try
+      {
+        onReplyMessage(message);
+      }
+      catch(Exception e) 
+      {
+        _logger.error(e);
+      }
+    }
+    if (isDone())
+    {
+      if(_timer != null)
+      {
+        _timer.cancel();
+      }
+      notifyAll();
+    }
+  }
+
+  /**
+   * Default implementation will wait until every message sent gets a response
+   * 
+   * @return
+   */
+  public boolean isDone()
+  {
+    return _messageReplied.size() == _messagesSent.size();
+  }
+
+  public boolean isTimedOut()
+  {
+    return _timedOut;
+  }
+
+  final void setMessagesSent(List<Message> generatedMessage)
+  {
+    _messagesSent = generatedMessage;
+  }
+  
+  final void startTimer()
+  {
+    if (_timer == null && _timeout > 0)
+    {
+      if (_startTimeStamp == 0)
+      {
+        _startTimeStamp = new Date().getTime();
+      }
+      _timer = new Timer(true);
+      _timer.schedule(new TimeoutTask(this), _timeout);
+    }  
+  }
+  
+  public abstract void onTimeOut();
+
+  public abstract void onReplyMessage(Message message);
+
+  class TimeoutTask extends TimerTask
+  {
+    AsyncCallback _callback;
+
+    public TimeoutTask(AsyncCallback asyncCallback)
+    {
+      _callback = asyncCallback;
+    }
+
+    @Override
+    public void run()
+    {
+      try
+      {
+        synchronized (_callback)
+        {
+          _callback._timedOut = true;
+          _callback.notifyAll();
+          _callback.onTimeOut();
+        }
+      } 
+      catch (Exception e)
+      {
+        _logger.error(e);
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
new file mode 100644
index 0000000..d4cc9b0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.Criteria.DataSource;
+import org.apache.helix.josql.ClusterJosqlQueryProcessor;
+import org.apache.helix.josql.ZNRecordRow;
+import org.apache.log4j.Logger;
+import org.josql.Query;
+import org.josql.QueryExecutionException;
+import org.josql.QueryParseException;
+import org.josql.QueryResults;
+
+
+public class CriteriaEvaluator
+{
+  private static Logger logger = Logger.getLogger(CriteriaEvaluator.class);
+  
+  public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria, HelixManager manager)
+  {
+    List<Map<String, String>> selected = new ArrayList<Map<String, String>>();
+    
+    String queryFields = 
+        (!recipientCriteria.getInstanceName().equals("")  ? " " + ZNRecordRow.MAP_SUBKEY  : " ''") +","+
+        (!recipientCriteria.getResource().equals("") ? " " + ZNRecordRow.ZNRECORD_ID : " ''") +","+
+        (!recipientCriteria.getPartition().equals("")   ? " " + ZNRecordRow.MAP_KEY   : " ''") +","+
+        (!recipientCriteria.getPartitionState().equals("") ? " " + ZNRecordRow.MAP_VALUE : " '' ");
+    
+    String matchCondition = 
+        ZNRecordRow.MAP_SUBKEY   + " LIKE '" + (!recipientCriteria.getInstanceName().equals("") ? (recipientCriteria.getInstanceName() +"'") :   "%' ") + " AND "+
+        ZNRecordRow.ZNRECORD_ID+ " LIKE '" + (!recipientCriteria.getResource().equals("") ? (recipientCriteria.getResource() +"'") : "%' ") + " AND "+
+        ZNRecordRow.MAP_KEY   + " LIKE '" + (!recipientCriteria.getPartition().equals("")   ? (recipientCriteria.getPartition()  +"'") :  "%' ") + " AND "+
+        ZNRecordRow.MAP_VALUE  + " LIKE '" + (!recipientCriteria.getPartitionState().equals("") ? (recipientCriteria.getPartitionState()+"'") :  "%' ") + " AND "+
+        ZNRecordRow.MAP_SUBKEY   + " IN ((SELECT [*]id FROM :LIVEINSTANCES))";
+        
+    
+    String queryTarget = recipientCriteria.getDataSource().toString() + ClusterJosqlQueryProcessor.FLATTABLE;
+    
+    String josql = "SELECT DISTINCT " + queryFields
+                 + " FROM " + queryTarget + " WHERE "
+                 + matchCondition;
+    ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
+    List<Object> result = new ArrayList<Object>();
+    try
+    {
+      logger.info("JOSQL query: " + josql);
+      result = p.runJoSqlQuery(josql, null, null);
+    } 
+    catch (Exception e)
+    {
+      logger.error("", e);
+      return selected;
+    } 
+    
+    for(Object o : result)
+    {
+      Map<String, String> resultRow = new HashMap<String, String>();
+      List<Object> row = (List<Object>)o;
+      resultRow.put("instanceName", (String)(row.get(0)));
+      resultRow.put("resourceName", (String)(row.get(1)));
+      resultRow.put("partitionName", (String)(row.get(2)));
+      resultRow.put("partitionState", (String)(row.get(3)));
+      selected.add(resultRow);
+    }
+    logger.info("JOSQL query return " + selected.size() + " rows");
+    return selected;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
new file mode 100644
index 0000000..b589874
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -0,0 +1,392 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.messaging.handling.AsyncCallbackService;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+
+public class DefaultMessagingService implements ClusterMessagingService
+{
+  private final HelixManager         _manager;
+  private final CriteriaEvaluator    _evaluator;
+  private final HelixTaskExecutor    _taskExecutor;
+  // TODO:rename to factory, this is not a service
+  private final AsyncCallbackService _asyncCallbackService;
+  private static Logger              _logger =
+                                                 Logger.getLogger(DefaultMessagingService.class);
+  ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded
+    = new ConcurrentHashMap<String, MessageHandlerFactory>();
+  
+  public DefaultMessagingService(HelixManager manager)
+  {
+    _manager = manager;
+    _evaluator = new CriteriaEvaluator();
+    _taskExecutor = new HelixTaskExecutor();
+    _asyncCallbackService = new AsyncCallbackService();
+    _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
+                                                _asyncCallbackService);
+  }
+
+  @Override
+  public int send(Criteria recipientCriteria, final Message messageTemplate)
+  {
+    return send(recipientCriteria, messageTemplate, null, -1);
+  }
+
+  @Override
+  public int send(final Criteria recipientCriteria,
+                  final Message message,
+                  AsyncCallback callbackOnReply,
+                  int timeOut)
+  {
+    return send(recipientCriteria, message, callbackOnReply, timeOut, 0);
+  }
+
+  @Override
+  public int send(final Criteria recipientCriteria,
+                  final Message message,
+                  AsyncCallback callbackOnReply,
+                  int timeOut,
+                  int retryCount)
+  {
+    Map<InstanceType, List<Message>> generateMessage =
+        generateMessage(recipientCriteria, message);
+    int totalMessageCount = 0;
+    for (List<Message> messages : generateMessage.values())
+    {
+      totalMessageCount += messages.size();
+    }
+    _logger.info("Send " + totalMessageCount + " messages with criteria "
+        + recipientCriteria);
+    if (totalMessageCount == 0)
+    {
+      return 0;
+    }
+    String correlationId = null;
+    if (callbackOnReply != null)
+    {
+      int totalTimeout = timeOut * (retryCount + 1);
+      if (totalTimeout < 0)
+      {
+        totalTimeout = -1;
+      }
+      callbackOnReply.setTimeout(totalTimeout);
+      correlationId = UUID.randomUUID().toString();
+      for (List<Message> messages : generateMessage.values())
+      {
+        callbackOnReply.setMessagesSent(messages);
+      }
+      _asyncCallbackService.registerAsyncCallback(correlationId, callbackOnReply);
+    }
+
+    for (InstanceType receiverType : generateMessage.keySet())
+    {
+      List<Message> list = generateMessage.get(receiverType);
+      for (Message tempMessage : list)
+      {
+        tempMessage.setRetryCount(retryCount);
+        tempMessage.setExecutionTimeout(timeOut);
+        tempMessage.setSrcInstanceType(_manager.getInstanceType());
+        if (correlationId != null)
+        {
+          tempMessage.setCorrelationId(correlationId);
+        }
+
+        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+        Builder keyBuilder = accessor.keyBuilder();
+
+        if (receiverType == InstanceType.CONTROLLER)
+        {
+          // _manager.getDataAccessor().setProperty(PropertyType.MESSAGES_CONTROLLER,
+          // tempMessage,
+          // tempMessage.getId());
+          accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()),
+                               tempMessage);
+        }
+
+        if (receiverType == InstanceType.PARTICIPANT)
+        {
+          accessor.setProperty(keyBuilder.message(tempMessage.getTgtName(),
+                                                  tempMessage.getId()),
+                               tempMessage);
+        }
+      }
+    }
+
+    if (callbackOnReply != null)
+    {
+      // start timer if timeout is set
+      callbackOnReply.startTimer();
+    }
+    return totalMessageCount;
+  }
+
+  private Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
+                                                           final Message message)
+  {
+    Map<InstanceType, List<Message>> messagesToSendMap =
+        new HashMap<InstanceType, List<Message>>();
+    InstanceType instanceType = recipientCriteria.getRecipientInstanceType();
+
+    if (instanceType == InstanceType.CONTROLLER)
+    {
+      List<Message> messages = generateMessagesForController(message);
+      messagesToSendMap.put(InstanceType.CONTROLLER, messages);
+      // _dataAccessor.setControllerProperty(PropertyType.MESSAGES,
+      // newMessage.getRecord(), CreateMode.PERSISTENT);
+    }
+    else if (instanceType == InstanceType.PARTICIPANT)
+    {
+      List<Message> messages = new ArrayList<Message>();
+      List<Map<String, String>> matchedList =
+          _evaluator.evaluateCriteria(recipientCriteria, _manager);
+
+      if (!matchedList.isEmpty())
+      {
+        Map<String, String> sessionIdMap = new HashMap<String, String>();
+        if (recipientCriteria.isSessionSpecific())
+        {
+          HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+          Builder keyBuilder = accessor.keyBuilder();
+
+          List<LiveInstance> liveInstances =
+              accessor.getChildValues(keyBuilder.liveInstances());
+
+          for (LiveInstance liveInstance : liveInstances)
+          {
+            sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
+          }
+        }
+        for (Map<String, String> map : matchedList)
+        {
+          String id = UUID.randomUUID().toString();
+          Message newMessage = new Message(message.getRecord(), id);
+          String srcInstanceName = _manager.getInstanceName();
+          String tgtInstanceName = map.get("instanceName");
+          // Don't send message to self
+          if (recipientCriteria.isSelfExcluded()
+              && srcInstanceName.equalsIgnoreCase(tgtInstanceName))
+          {
+            continue;
+          }
+          newMessage.setSrcName(srcInstanceName);
+          newMessage.setTgtName(tgtInstanceName);
+          newMessage.setResourceName(map.get("resourceName"));
+          newMessage.setPartitionName(map.get("partitionName"));
+          if (recipientCriteria.isSessionSpecific())
+          {
+            newMessage.setTgtSessionId(sessionIdMap.get(tgtInstanceName));
+          }
+          messages.add(newMessage);
+        }
+        messagesToSendMap.put(InstanceType.PARTICIPANT, messages);
+      }
+    }
+    return messagesToSendMap;
+  }
+
+  private List<Message> generateMessagesForController(Message message)
+  {
+    List<Message> messages = new ArrayList<Message>();
+    String id = UUID.randomUUID().toString();
+    Message newMessage = new Message(message.getRecord(), id);
+    newMessage.setMsgId(id);
+    newMessage.setSrcName(_manager.getInstanceName());
+    newMessage.setTgtName("Controller");
+    messages.add(newMessage);
+    return messages;
+  }
+  
+  @Override
+  public synchronized void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
+  {
+    if (_manager.isConnected())
+    {
+      registerMessageHandlerFactoryInternal(type, factory);
+    }
+    else
+    {
+      _messageHandlerFactoriestobeAdded.put(type, factory);
+    }
+  }
+  
+  public synchronized void onConnected()
+  {
+    for(String type : _messageHandlerFactoriestobeAdded.keySet())
+    {
+      registerMessageHandlerFactoryInternal(type, _messageHandlerFactoriestobeAdded.get(type));
+    }
+    _messageHandlerFactoriestobeAdded.clear();
+  }
+  
+  void registerMessageHandlerFactoryInternal(String type, MessageHandlerFactory factory)
+  {
+    _logger.info("registering msg factory for type " + type);
+    int threadpoolSize = HelixTaskExecutor.DEFAULT_PARALLEL_TASKS;
+    String threadpoolSizeStr = null;
+    String key = type + "." + HelixTaskExecutor.MAX_THREADS;
+    
+    ConfigAccessor configAccessor = _manager.getConfigAccessor();
+    if(configAccessor != null)
+    {
+      ConfigScope scope = null;
+      
+      // Read the participant config and cluster config for the per-message type thread pool size.
+      // participant config will override the cluster config.
+      
+      if(_manager.getInstanceType() == InstanceType.PARTICIPANT || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
+      { 
+        scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).forParticipant(_manager.getInstanceName()).build();
+        threadpoolSizeStr = configAccessor.get(scope, key);
+      }
+      
+      if(threadpoolSizeStr == null)
+      {
+        scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
+        threadpoolSizeStr = configAccessor.get(scope, key);
+      }
+    }
+    
+    if(threadpoolSizeStr != null)
+    {
+      try
+      {
+        threadpoolSize = Integer.parseInt(threadpoolSizeStr);
+        if(threadpoolSize <= 0)
+        {
+          threadpoolSize = 1;
+        }
+      }
+      catch(Exception e)
+      {
+        _logger.error("", e);
+      }
+    }
+    
+    _taskExecutor.registerMessageHandlerFactory(type, factory, threadpoolSize);
+    // Self-send a no-op message, so that the onMessage() call will be invoked
+    // again, and
+    // we have a chance to process the message that we received with the new
+    // added MessageHandlerFactory
+    // before the factory is added.
+    sendNopMessage();
+  }
+
+  public void sendNopMessage()
+  {
+    if (_manager.isConnected())
+    {
+      try
+      {
+        Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
+        nopMsg.setSrcName(_manager.getInstanceName());
+
+        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+        Builder keyBuilder = accessor.keyBuilder();
+
+        if (_manager.getInstanceType() == InstanceType.CONTROLLER
+            || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
+        {
+          nopMsg.setTgtName("Controller");
+          accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
+        }
+
+        if (_manager.getInstanceType() == InstanceType.PARTICIPANT
+            || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
+        {
+          nopMsg.setTgtName(_manager.getInstanceName());
+          accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()),
+                               nopMsg);
+        }
+      }
+      catch (Exception e)
+      {
+        _logger.error(e);
+      }
+    }
+  }
+
+  public HelixTaskExecutor getExecutor()
+  {
+    return _taskExecutor;
+  }
+
+  @Override
+  public int sendAndWait(Criteria receipientCriteria,
+                         Message message,
+                         AsyncCallback asyncCallback,
+                         int timeOut,
+                         int retryCount)
+  {
+    int messagesSent =
+        send(receipientCriteria, message, asyncCallback, timeOut, retryCount);
+    if (messagesSent > 0)
+    {
+      while (!asyncCallback.isDone() && !asyncCallback.isTimedOut())
+      {
+        synchronized (asyncCallback)
+        {
+          try
+          {
+            asyncCallback.wait();
+          }
+          catch (InterruptedException e)
+          {
+            _logger.error(e);
+            asyncCallback.setInterrupted(true);
+            break;
+          }
+        }
+      }
+    }
+    else
+    {
+      _logger.warn("No messages sent. For Criteria:" + receipientCriteria);
+    }
+    return messagesSent;
+  }
+
+  @Override
+  public int sendAndWait(Criteria recipientCriteria,
+                         Message message,
+                         AsyncCallback asyncCallback,
+                         int timeOut)
+  {
+    return sendAndWait(recipientCriteria, message, asyncCallback, timeOut, 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
new file mode 100644
index 0000000..6ad31d9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
@@ -0,0 +1,149 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.Map;
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.log4j.Logger;
+
+
+public class AsyncCallbackService implements MessageHandlerFactory
+{
+  private final ConcurrentHashMap<String, AsyncCallback> _callbackMap = new ConcurrentHashMap<String, AsyncCallback>();
+  private static Logger _logger = Logger.getLogger(AsyncCallbackService.class);
+
+  public AsyncCallbackService()
+  {
+  }
+
+  public void registerAsyncCallback(String correlationId, AsyncCallback callback)
+  {
+    if (_callbackMap.containsKey(correlationId))
+    {
+      _logger.warn("correlation id " + correlationId + " already registered");
+    }
+    _logger.info("registering correlation id " + correlationId);
+    _callbackMap.put(correlationId, callback);
+  }
+
+  void verifyMessage(Message message)
+  {
+    if (!message.getMsgType().toString()
+        .equalsIgnoreCase(MessageType.TASK_REPLY.toString()))
+    {
+      String errorMsg = "Unexpected msg type for message " + message.getMsgId()
+          + " type:" + message.getMsgType() + " Expected : "
+          + MessageType.TASK_REPLY;
+      _logger.error(errorMsg);
+      throw new HelixException(errorMsg);
+    }
+    String correlationId = message.getCorrelationId();
+    if (correlationId == null)
+    {
+      String errorMsg = "Message " + message.getMsgId()
+          + " does not have correlation id";
+      _logger.error(errorMsg);
+      throw new HelixException(errorMsg);
+    }
+
+    if (!_callbackMap.containsKey(correlationId))
+    {
+      String errorMsg = "Message "
+          + message.getMsgId()
+          + " does not have correponding callback. Probably timed out already. Correlation id: "
+          + correlationId;
+      _logger.error(errorMsg);
+      throw new HelixException(errorMsg);
+    }
+    _logger.info("Verified reply message " + message.getMsgId()
+        + " correlation:" + correlationId);
+  }
+
+  @Override
+  public MessageHandler createHandler(Message message,
+      NotificationContext context)
+  {
+    verifyMessage(message);
+    return new AsyncCallbackMessageHandler(message.getCorrelationId(),message, context);
+  }
+
+  @Override
+  public String getMessageType()
+  {
+    return MessageType.TASK_REPLY.toString();
+  }
+
+  @Override
+  public void reset()
+  {
+
+  }
+
+  public class AsyncCallbackMessageHandler extends MessageHandler
+  {
+    private final String _correlationId;
+
+    public AsyncCallbackMessageHandler(String correlationId, Message message, NotificationContext context)
+    {
+      super(message, context);
+      _correlationId = correlationId;
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() throws InterruptedException
+    {
+      verifyMessage(_message);
+      HelixTaskResult result = new HelixTaskResult();
+      assert (_correlationId.equalsIgnoreCase(_message.getCorrelationId()));
+      _logger.info("invoking reply message " + _message.getMsgId()
+          + ", correlationid:" + _correlationId);
+
+      AsyncCallback callback = _callbackMap.get(_correlationId);
+      synchronized (callback)
+      {
+        callback.onReply(_message);
+        if (callback.isDone())
+        {
+          _logger.info("Removing finished callback, correlationid:"
+              + _correlationId);
+          _callbackMap.remove(_correlationId);
+        }
+      }
+      result.setSuccess(true);
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type)
+    {
+      _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java
new file mode 100644
index 0000000..a6ee6f0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java
@@ -0,0 +1,116 @@
+package org.apache.helix.messaging.handling;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
+
+
+public class GroupMessageHandler
+{
+  class CurrentStateUpdate
+  {
+    final PropertyKey  _key;
+    final CurrentState _curStateDelta;
+
+    public CurrentStateUpdate(PropertyKey key, CurrentState curStateDelta)
+    {
+      _key = key;
+      _curStateDelta = curStateDelta;
+    }
+
+    public void merge(CurrentState curState)
+    {
+      _curStateDelta.getRecord().merge(curState.getRecord());
+    }
+  }
+
+  static class GroupMessageInfo
+  {
+    final Message                                   _message;
+    final AtomicInteger                             _countDown;
+    final ConcurrentLinkedQueue<CurrentStateUpdate> _curStateUpdateList;
+
+    public GroupMessageInfo(Message message)
+    {
+      _message = message;
+      List<String> partitionNames = message.getPartitionNames();
+      _countDown = new AtomicInteger(partitionNames.size());
+      _curStateUpdateList = new ConcurrentLinkedQueue<CurrentStateUpdate>();
+    }
+    
+    public Map<PropertyKey, CurrentState> merge()
+    {
+      Map<String, CurrentStateUpdate> curStateUpdateMap =
+          new HashMap<String, CurrentStateUpdate>();
+      for (CurrentStateUpdate update : _curStateUpdateList)
+      {
+        String path = update._key.getPath();
+        if (!curStateUpdateMap.containsKey(path))
+        {
+          curStateUpdateMap.put(path, update);
+        }
+        else
+        {
+          curStateUpdateMap.get(path).merge(update._curStateDelta);
+        }
+      }
+
+      Map<PropertyKey, CurrentState> ret = new HashMap<PropertyKey, CurrentState>();
+      for (CurrentStateUpdate update : curStateUpdateMap.values())
+      {
+        ret.put(update._key, update._curStateDelta);
+      }
+
+      return ret;
+    }
+ 
+  }
+
+  final ConcurrentHashMap<String, GroupMessageInfo> _groupMsgMap;
+
+  public GroupMessageHandler()
+  {
+    _groupMsgMap = new ConcurrentHashMap<String, GroupMessageInfo>();
+  }
+
+  public void put(Message message)
+  {
+    _groupMsgMap.putIfAbsent(message.getId(), new GroupMessageInfo(message));
+  }
+
+  // return non-null if all sub-messages are completed
+  public GroupMessageInfo onCompleteSubMessage(Message subMessage)
+  {
+    String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
+    GroupMessageInfo info = _groupMsgMap.get(parentMid);
+    if (info != null)
+    {
+      int val = info._countDown.decrementAndGet();
+      if (val <= 0)
+      {
+        return _groupMsgMap.remove(parentMid);
+      }
+    }
+
+    return null;
+  }
+
+  void addCurStateUpdate(Message subMessage, PropertyKey key, CurrentState delta)
+  {
+    String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
+    GroupMessageInfo info = _groupMsgMap.get(parentMid);
+    if (info != null)
+    {
+      info._curStateUpdateList.add(new CurrentStateUpdate(key, delta));
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
new file mode 100644
index 0000000..102a984
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -0,0 +1,388 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecordBucketizer;
+import org.apache.helix.ZNRecordDelta;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecordDelta.MergeOperation;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelParser;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.util.StatusUpdateUtil;
+import org.apache.log4j.Logger;
+
+
+public class HelixStateTransitionHandler extends MessageHandler
+{
+  public static class HelixStateMismatchException extends Exception
+  {
+    public HelixStateMismatchException(String info)
+    {
+      super(info);
+    }
+  }
+  private static Logger          logger     =
+                                                Logger.getLogger(HelixStateTransitionHandler.class);
+  private final StateModel       _stateModel;
+  StatusUpdateUtil               _statusUpdateUtil;
+  private final StateModelParser _transitionMethodFinder;
+  private final CurrentState     _currentStateDelta;
+  volatile boolean               _isTimeout = false;
+  private final HelixTaskExecutor              _executor;
+
+  public HelixStateTransitionHandler(StateModel stateModel,
+                                     Message message,
+                                     NotificationContext context,
+                                     CurrentState currentStateDelta,
+                                     HelixTaskExecutor executor)
+  {
+    super(message, context);
+    _stateModel = stateModel;
+    _statusUpdateUtil = new StatusUpdateUtil();
+    _transitionMethodFinder = new StateModelParser();
+    _currentStateDelta = currentStateDelta;
+    _executor = executor;
+  }
+
+  private void prepareMessageExecution(HelixManager manager, Message message) throws HelixException,
+      HelixStateMismatchException
+  {
+    if (!message.isValid())
+    {
+      String errorMessage =
+          "Invalid Message, ensure that message: " + message
+              + " has all the required fields: "
+              + Arrays.toString(Message.Attributes.values());
+
+      _statusUpdateUtil.logError(message,
+                                 HelixStateTransitionHandler.class,
+                                 errorMessage,
+                                 manager.getHelixDataAccessor());
+      logger.error(errorMessage);
+      throw new HelixException(errorMessage);
+    }
+    // DataAccessor accessor = manager.getDataAccessor();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+
+    String partitionName = message.getPartitionName();
+    String fromState = message.getFromState();
+
+    // Verify the fromState and current state of the stateModel
+    String state = _currentStateDelta.getState(partitionName);
+
+    if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state))
+    {
+      String errorMessage =
+          "Current state of stateModel does not match the fromState in Message"
+              + ", Current State:" + state + ", message expected:" + fromState
+              + ", partition: " + partitionName + ", from: " + message.getMsgSrc()
+              + ", to: " + message.getTgtName();
+
+      _statusUpdateUtil.logError(message,
+                                 HelixStateTransitionHandler.class,
+                                 errorMessage,
+                                 accessor);
+      logger.error(errorMessage);
+      throw new HelixStateMismatchException(errorMessage);
+    }
+  }
+
+  void postExecutionMessage(HelixManager manager,
+                            Message message,
+                            NotificationContext context,
+                            HelixTaskResult taskResult,
+                            Exception exception)
+  {
+    String partitionKey = message.getPartitionName();
+    String resource = message.getResourceName();
+    String sessionId = message.getTgtSessionId();
+    String instanceName = manager.getInstanceName();
+
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    int bucketSize = message.getBucketSize();
+    ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
+
+    // Lock the helix manager so that the session id will not change when we update
+    // the state model state. for zk current state it is OK as we have the per-session
+    // current state node
+    synchronized (manager)
+    {
+      if (!message.getTgtSessionId().equals(manager.getSessionId()))
+      {
+        logger.warn("Session id has changed. Skip postExecutionMessage. Old session "
+            + message.getExecutionSessionId() + " , new session : "
+            + manager.getSessionId());
+        return;
+      }
+
+      if (taskResult.isSucess())
+      {
+        // String fromState = message.getFromState();
+        String toState = message.getToState();
+        _currentStateDelta.setState(partitionKey, toState);
+
+        if (toState.equalsIgnoreCase("DROPPED"))
+        {
+          // for "OnOfflineToDROPPED" message, we need to remove the resource key record
+          // from the current state of the instance because the resource key is dropped.
+          // In the state model it will be stayed as "OFFLINE", which is OK.
+          ZNRecordDelta delta =
+              new ZNRecordDelta(_currentStateDelta.getRecord(), MergeOperation.SUBTRACT);
+          // Don't subtract simple fields since they contain stateModelDefRef
+          delta._record.getSimpleFields().clear();
+
+          List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
+          deltaList.add(delta);
+          _currentStateDelta.setDeltaList(deltaList);
+        }
+        else
+        {
+          // if the partition is not to be dropped, update _stateModel to the TO_STATE
+          _stateModel.updateState(toState);
+        }
+      }
+      else
+      {
+        if (exception instanceof HelixStateMismatchException)
+        {
+          // if fromState mismatch, set current state on zk to stateModel's current state
+          logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. partitionKey: "
+              + partitionKey
+              + ", currentState: "
+              + _stateModel.getCurrentState()
+              + ", message: " + message);
+          _currentStateDelta.setState(partitionKey, _stateModel.getCurrentState());
+        }
+        else
+        {
+          StateTransitionError error =
+              new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, exception);
+          if (exception instanceof InterruptedException)
+          {
+            if (_isTimeout)
+            {
+              error =
+                  new StateTransitionError(ErrorType.INTERNAL,
+                                           ErrorCode.TIMEOUT,
+                                           exception);
+            }
+            else
+            {
+              // State transition interrupted but not caused by timeout. Keep the current
+              // state in this case
+              logger.error("State transition interrupted but not timeout. Not updating state. Partition : "
+                  + message.getPartitionName() + " MsgId : " + message.getMsgId());
+              return;
+            }
+          }
+          _stateModel.rollbackOnError(message, context, error);
+          _currentStateDelta.setState(partitionKey, "ERROR");
+          _stateModel.updateState("ERROR");
+        }
+      }
+    }
+    try
+    {
+      // Update the ZK current state of the node
+      PropertyKey key = keyBuilder.currentState(instanceName,
+                              sessionId,
+                              resource,
+                              bucketizer.getBucketName(partitionKey));
+      if (!_message.getGroupMessageMode())
+      {
+        accessor.updateProperty(key, _currentStateDelta);
+      }
+      else
+      {
+        _executor._groupMsgHandler.addCurStateUpdate(_message, key, _currentStateDelta);
+      }
+    }
+    catch (Exception e)
+    {
+      logger.error("Error when updating the state ", e);
+      StateTransitionError error =
+          new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
+      _stateModel.rollbackOnError(message, context, error);
+      _statusUpdateUtil.logError(message,
+                                 HelixStateTransitionHandler.class,
+                                 e,
+                                 "Error when update the state ",
+                                 accessor);
+    }
+  }
+
+  public HelixTaskResult handleMessageInternal(Message message,
+                                               NotificationContext context)
+  {
+    synchronized (_stateModel)
+    {
+      HelixTaskResult taskResult = new HelixTaskResult();
+      HelixManager manager = context.getManager();
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+
+      _statusUpdateUtil.logInfo(message,
+                                HelixStateTransitionHandler.class,
+                                "Message handling task begin execute",
+                                accessor);
+      message.setExecuteStartTimeStamp(new Date().getTime());
+
+      Exception exception = null;
+      try
+      {
+        prepareMessageExecution(manager, message);
+        invoke(accessor, context, taskResult, message);
+      }
+      catch (HelixStateMismatchException e)
+      {
+        // Simply log error and return from here if State mismatch.
+        // The current state of the state model is intact.
+        taskResult.setSuccess(false);
+        taskResult.setMessage(e.toString());
+        taskResult.setException(e);
+        exception = e;
+        // return taskResult;
+      }
+      catch (Exception e)
+      {
+        String errorMessage =
+            "Exception while executing a state transition task "
+                + message.getPartitionName();
+        logger.error(errorMessage, e);
+        if (e.getCause() != null && e.getCause() instanceof InterruptedException)
+        {
+          e = (InterruptedException) e.getCause();
+        }
+        _statusUpdateUtil.logError(message,
+                                   HelixStateTransitionHandler.class,
+                                   e,
+                                   errorMessage,
+                                   accessor);
+        taskResult.setSuccess(false);
+        taskResult.setMessage(e.toString());
+        taskResult.setException(e);
+        taskResult.setInterrupted(e instanceof InterruptedException);
+        exception = e;
+      }
+      postExecutionMessage(manager, message, context, taskResult, exception);
+
+      return taskResult;
+    }
+  }
+
+  private void invoke(HelixDataAccessor accessor,
+                      NotificationContext context,
+                      HelixTaskResult taskResult,
+                      Message message) throws IllegalAccessException,
+      InvocationTargetException,
+      InterruptedException
+  {
+    _statusUpdateUtil.logInfo(message,
+                              HelixStateTransitionHandler.class,
+                              "Message handling invoking",
+                              accessor);
+
+    // by default, we invoke state transition function in state model
+    Method methodToInvoke = null;
+    String fromState = message.getFromState();
+    String toState = message.getToState();
+    methodToInvoke =
+        _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(),
+                                                       fromState,
+                                                       toState,
+                                                       new Class[] { Message.class,
+                                                           NotificationContext.class });
+    if (methodToInvoke != null)
+    {
+      methodToInvoke.invoke(_stateModel, new Object[] { message, context });
+      taskResult.setSuccess(true);
+    }
+    else
+    {
+      String errorMessage =
+          "Unable to find method for transition from " + fromState + " to " + toState
+              + "in " + _stateModel.getClass();
+      logger.error(errorMessage);
+      taskResult.setSuccess(false);
+
+      _statusUpdateUtil.logError(message,
+                                 HelixStateTransitionHandler.class,
+                                 errorMessage,
+                                 accessor);
+    }
+  }
+
+  @Override
+  public HelixTaskResult handleMessage()
+  {
+    return handleMessageInternal(_message, _notificationContext);
+  }
+
+  @Override
+  public void onError(Exception e, ErrorCode code, ErrorType type)
+  {
+    // All internal error has been processed already, so we can skip them
+    if (type == ErrorType.INTERNAL)
+    {
+      logger.error("Skip internal error " + e.getMessage() + " " + code);
+      return;
+    }
+    HelixManager manager = _notificationContext.getManager();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    String instanceName = manager.getInstanceName();
+    String partition = _message.getPartitionName();
+    String resourceName = _message.getResourceName();
+    CurrentState currentStateDelta = new CurrentState(resourceName);
+
+    StateTransitionError error = new StateTransitionError(type, code, e);
+    _stateModel.rollbackOnError(_message, _notificationContext, error);
+    // if the transition is not canceled, it should go into error state
+    if (code == ErrorCode.ERROR)
+    {
+      currentStateDelta.setState(partition, "ERROR");
+      _stateModel.updateState("ERROR");
+
+      accessor.updateProperty(keyBuilder.currentState(instanceName,
+                                                      _message.getTgtSessionId(),
+                                                      resourceName),
+                              currentStateDelta);
+    }
+  }
+
+  @Override
+  public void onTimeout()
+  {
+    _isTimeout = true;
+  }
+};

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
new file mode 100644
index 0000000..d14cfaa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -0,0 +1,369 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.messaging.handling.GroupMessageHandler.GroupMessageInfo;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.monitoring.StateTransitionContext;
+import org.apache.helix.monitoring.StateTransitionDataPoint;
+import org.apache.helix.util.StatusUpdateUtil;
+import org.apache.log4j.Logger;
+
+
+public class HelixTask implements Callable<HelixTaskResult>
+{
+  private static Logger             logger     = Logger.getLogger(HelixTask.class);
+  private final Message             _message;
+  private final MessageHandler      _handler;
+  private final NotificationContext _notificationContext;
+  private final HelixManager        _manager;
+  StatusUpdateUtil                  _statusUpdateUtil;
+  HelixTaskExecutor                 _executor;
+  volatile boolean                  _isTimeout = false;
+
+  public class TimeoutCancelTask extends TimerTask
+  {
+    HelixTaskExecutor   _executor;
+    Message             _message;
+    NotificationContext _context;
+
+    public TimeoutCancelTask(HelixTaskExecutor executor,
+                             Message message,
+                             NotificationContext context)
+    {
+      _executor = executor;
+      _message = message;
+      _context = context;
+    }
+
+    @Override
+    public void run()
+    {
+      _isTimeout = true;
+      logger.warn("Message time out, canceling. id:" + _message.getMsgId()
+          + " timeout : " + _message.getExecutionTimeout());
+      _handler.onTimeout();
+      _executor.cancelTask(_message, _context);
+    }
+
+  }
+
+  public HelixTask(Message message,
+                   NotificationContext notificationContext,
+                   MessageHandler handler,
+                   HelixTaskExecutor executor) throws Exception
+  {
+    this._notificationContext = notificationContext;
+    this._message = message;
+    this._handler = handler;
+    this._manager = notificationContext.getManager();
+    _statusUpdateUtil = new StatusUpdateUtil();
+    _executor = executor;
+  }
+
+  @Override
+  public HelixTaskResult call()
+  {
+    // Start the timeout TimerTask, if necessary
+    Timer timer = null;
+    if (_message.getExecutionTimeout() > 0)
+    {
+      timer = new Timer(true);
+      timer.schedule(new TimeoutCancelTask(_executor, _message, _notificationContext),
+                     _message.getExecutionTimeout());
+      logger.info("Message starts with timeout " + _message.getExecutionTimeout()
+          + " MsgId:" + _message.getMsgId());
+    }
+    else
+    {
+      logger.info("Message does not have timeout. MsgId:" + _message.getMsgId() + "/"
+          + _message.getPartitionName());
+    }
+
+    HelixTaskResult taskResult = new HelixTaskResult();
+
+    Exception exception = null;
+    ErrorType type = ErrorType.INTERNAL;
+    ErrorCode code = ErrorCode.ERROR;
+
+    long start = System.currentTimeMillis();
+    logger.info("msg:" + _message.getMsgId() + " handling task begin, at: " + start);
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    _statusUpdateUtil.logInfo(_message,
+                              HelixTask.class,
+                              "Message handling task begin execute",
+                              accessor);
+    _message.setExecuteStartTimeStamp(new Date().getTime());
+
+    // Handle the message
+    try
+    {
+      taskResult = _handler.handleMessage();
+      exception = taskResult.getException();
+    }
+    catch (InterruptedException e)
+    {
+      _statusUpdateUtil.logError(_message,
+                                 HelixTask.class,
+                                 e,
+                                 "State transition interrupted, timeout:" + _isTimeout,
+                                 accessor);
+      logger.info("Message " + _message.getMsgId() + " is interrupted");
+      taskResult.setInterrupted(true);
+      taskResult.setException(e);
+      exception = e;
+    }
+    catch (Exception e)
+    {
+      String errorMessage =
+          "Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
+              + " type: " + _message.getMsgType();
+      logger.error(errorMessage, e);
+      _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
+      taskResult.setSuccess(false);
+      taskResult.setException(e);
+      taskResult.setMessage(e.getMessage());
+      exception = e;
+    }
+
+    // Cancel the timer since the handling is done
+    // it is fine if the TimerTask for canceling is called already
+    if (timer != null)
+    {
+      timer.cancel();
+    }
+
+    if (taskResult.isSucess())
+    {
+      _statusUpdateUtil.logInfo(_message,
+                                _handler.getClass(),
+                                "Message handling task completed successfully",
+                                accessor);
+      logger.info("Message " + _message.getMsgId() + " completed.");
+    }
+    else if (taskResult.isInterrupted())
+    {
+      logger.info("Message " + _message.getMsgId() + " is interrupted");
+      code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
+      if (_isTimeout)
+      {
+        int retryCount = _message.getRetryCount();
+        logger.info("Message timeout, retry count: " + retryCount + " MSGID:"
+            + _message.getMsgId());
+        _statusUpdateUtil.logInfo(_message,
+                                  _handler.getClass(),
+                                  "Message handling task timeout, retryCount:"
+                                      + retryCount,
+                                  accessor);
+        // Notify the handler that timeout happens, and the number of retries left
+        // In case timeout happens (time out and also interrupted)
+        // we should retry the execution of the message by re-schedule it in
+        if (retryCount > 0)
+        {
+          _message.setRetryCount(retryCount - 1);
+          _executor.scheduleTask(_message, _handler, _notificationContext);
+          return taskResult;
+        }
+      }
+    }
+    else
+    // logging for errors
+    {
+      String errorMsg =
+          "Message execution failed. msgId: " + _message.getMsgId()
+              + taskResult.getMessage();
+      if (exception != null)
+      {
+        errorMsg += exception;
+      }
+      logger.error(errorMsg, exception);
+      _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
+    }
+
+    // Post-processing for the finished task
+    try
+    {
+      if (!_message.getGroupMessageMode())
+      {
+        removeMessageFromZk(accessor, _message);
+        reportMessageStat(_manager, _message, taskResult);
+        sendReply(accessor, _message, taskResult);
+      }
+      else
+      {
+        GroupMessageInfo info = _executor._groupMsgHandler.onCompleteSubMessage(_message); 
+        if (info != null)
+        {
+          // TODO: changed to async update
+          // group update current state
+          Map<PropertyKey, CurrentState> curStateMap = info.merge();
+          for (PropertyKey key : curStateMap.keySet())
+          {
+            accessor.updateProperty(key, curStateMap.get(key));
+          }
+
+          // remove group message
+          removeMessageFromZk(accessor, _message);
+          reportMessageStat(_manager, _message, taskResult);
+          sendReply(accessor, _message, taskResult);
+        }
+      }
+      _executor.reportCompletion(_message);
+    }
+
+    // TODO: capture errors and log here
+    catch (Exception e)
+    {
+      String errorMessage =
+          "Exception after executing a message, msgId: " + _message.getMsgId() + e;
+      logger.error(errorMessage, e);
+      _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
+      exception = e;
+      type = ErrorType.FRAMEWORK;
+      code = ErrorCode.ERROR;
+    }
+    //
+    finally
+    {
+      long end = System.currentTimeMillis();
+      logger.info("msg:" + _message.getMsgId() + " handling task completed, results:"
+          + taskResult.isSucess() + ", at: " + end + ", took:" + (end - start));
+
+      // Notify the handler about any error happened in the handling procedure, so that
+      // the handler have chance to finally cleanup
+      if (exception != null)
+      {
+        _handler.onError(exception, code, type);
+      }
+    }
+    return taskResult;
+  }
+
+  private void removeMessageFromZk(HelixDataAccessor accessor, Message message)
+  {
+    Builder keyBuilder = accessor.keyBuilder();
+    if (message.getTgtName().equalsIgnoreCase("controller"))
+    {
+      // TODO: removeProperty returns boolean
+      accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId()));
+    }
+    else
+    {
+      accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(),
+                                                 message.getMsgId()));
+    }
+  }
+
+  private void sendReply(HelixDataAccessor accessor,
+                         Message message,
+                         HelixTaskResult taskResult)
+  {
+    if (_message.getCorrelationId() != null
+        && !message.getMsgType().equals(MessageType.TASK_REPLY.toString()))
+    {
+      logger.info("Sending reply for message " + message.getCorrelationId());
+      _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", accessor);
+
+      taskResult.getTaskResultMap().put("SUCCESS", "" + taskResult.isSucess());
+      taskResult.getTaskResultMap().put("INTERRUPTED", "" + taskResult.isInterrupted());
+      if (!taskResult.isSucess())
+      {
+        taskResult.getTaskResultMap().put("ERRORINFO", taskResult.getMessage());
+      }
+      Message replyMessage =
+          Message.createReplyMessage(_message,
+                                     _manager.getInstanceName(),
+                                     taskResult.getTaskResultMap());
+      replyMessage.setSrcInstanceType(_manager.getInstanceType());
+
+      if (message.getSrcInstanceType() == InstanceType.PARTICIPANT)
+      {
+        Builder keyBuilder = accessor.keyBuilder();
+        accessor.setProperty(keyBuilder.message(message.getMsgSrc(),
+                                                replyMessage.getMsgId()),
+                             replyMessage);
+      }
+      else if (message.getSrcInstanceType() == InstanceType.CONTROLLER)
+      {
+        Builder keyBuilder = accessor.keyBuilder();
+        accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()),
+                             replyMessage);
+      }
+      _statusUpdateUtil.logInfo(message, HelixTask.class, "1 msg replied to "
+          + replyMessage.getTgtName(), accessor);
+    }
+  }
+
+  private void reportMessageStat(HelixManager manager,
+                                 Message message,
+                                 HelixTaskResult taskResult)
+  {
+    // report stat
+    if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
+    {
+      return;
+    }
+    long now = new Date().getTime();
+    long msgReadTime = message.getReadTimeStamp();
+    long msgExecutionStartTime = message.getExecuteStartTimeStamp();
+    if (msgReadTime != 0 && msgExecutionStartTime != 0)
+    {
+      long totalDelay = now - msgReadTime;
+      long executionDelay = now - msgExecutionStartTime;
+      if (totalDelay > 0 && executionDelay > 0)
+      {
+        String fromState = message.getFromState();
+        String toState = message.getToState();
+        String transition = fromState + "--" + toState;
+
+        StateTransitionContext cxt =
+            new StateTransitionContext(manager.getClusterName(),
+                                       manager.getInstanceName(),
+                                       message.getResourceName(),
+                                       transition);
+
+        StateTransitionDataPoint data =
+            new StateTransitionDataPoint(totalDelay,
+                                         executionDelay,
+                                         taskResult.isSucess());
+        _executor.getParticipantMonitor().reportTransitionStat(cxt, data);
+      }
+    }
+    else
+    {
+      logger.warn("message read time and start execution time not recorded.");
+    }
+  }
+
+};

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
new file mode 100644
index 0000000..daff92b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -0,0 +1,638 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.NotificationContext.Type;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.monitoring.ParticipantMonitor;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.util.StatusUpdateUtil;
+import org.apache.log4j.Logger;
+
+
+public class HelixTaskExecutor implements MessageListener
+{
+  // TODO: we need to further design how to throttle this.
+  // From storage point of view, only bootstrap case is expensive
+  // and we need to throttle, which is mostly IO / network bounded.
+  public static final int                                DEFAULT_PARALLEL_TASKS     = 40;
+  // TODO: create per-task type threadpool with customizable pool size
+  protected final Map<String, Future<HelixTaskResult>>   _taskMap;
+  private final Object                                   _lock;
+  private final StatusUpdateUtil                         _statusUpdateUtil;
+  private final ParticipantMonitor                       _monitor;
+  public static final String                             MAX_THREADS                =
+                                                                                        "maxThreads";
+
+  final ConcurrentHashMap<String, MessageHandlerFactory> _handlerFactoryMap         =
+                                                                                        new ConcurrentHashMap<String, MessageHandlerFactory>();
+
+  final ConcurrentHashMap<String, ExecutorService>       _threadpoolMap             =
+                                                                                        new ConcurrentHashMap<String, ExecutorService>();
+
+  private static Logger                                  LOG                        =
+                                                                                        Logger.getLogger(HelixTaskExecutor.class);
+
+  Map<String, Integer>                                   _resourceThreadpoolSizeMap =
+                                                                                        new ConcurrentHashMap<String, Integer>();
+
+  final GroupMessageHandler                              _groupMsgHandler;
+
+  public HelixTaskExecutor()
+  {
+    _taskMap = new ConcurrentHashMap<String, Future<HelixTaskResult>>();
+    _groupMsgHandler = new GroupMessageHandler();
+
+    _lock = new Object();
+    _statusUpdateUtil = new StatusUpdateUtil();
+    _monitor = new ParticipantMonitor();
+    startMonitorThread();
+  }
+
+  public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
+  {
+    registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS);
+  }
+
+  public void registerMessageHandlerFactory(String type,
+                                            MessageHandlerFactory factory,
+                                            int threadpoolSize)
+  {
+    if (!_handlerFactoryMap.containsKey(type))
+    {
+      if (!type.equalsIgnoreCase(factory.getMessageType()))
+      {
+        throw new HelixException("Message factory type mismatch. Type: " + type
+            + " factory : " + factory.getMessageType());
+
+      }
+      _handlerFactoryMap.put(type, factory);
+      _threadpoolMap.put(type, Executors.newFixedThreadPool(threadpoolSize));
+      LOG.info("Adding msg factory for type " + type + " threadpool size "
+          + threadpoolSize);
+    }
+    else
+    {
+      LOG.error("Ignoring duplicate msg handler factory for type " + type);
+    }
+  }
+
+  public ParticipantMonitor getParticipantMonitor()
+  {
+    return _monitor;
+  }
+
+  private void startMonitorThread()
+  {
+    // start a thread which monitors the completions of task
+  }
+
+  void checkResourceConfig(String resourceName, HelixManager manager)
+  {
+    if (!_resourceThreadpoolSizeMap.containsKey(resourceName))
+    {
+      int threadpoolSize = -1;
+      ConfigAccessor configAccessor = manager.getConfigAccessor();
+      if (configAccessor != null)
+      {
+        ConfigScope scope =
+            new ConfigScopeBuilder().forCluster(manager.getClusterName())
+                                    .forResource(resourceName)
+                                    .build();
+
+        String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
+        try
+        {
+          if (threadpoolSizeStr != null)
+          {
+            threadpoolSize = Integer.parseInt(threadpoolSizeStr);
+          }
+        }
+        catch (Exception e)
+        {
+          LOG.error("", e);
+        }
+      }
+      if (threadpoolSize > 0)
+      {
+        String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName;
+        _threadpoolMap.put(key, Executors.newFixedThreadPool(threadpoolSize));
+        LOG.info("Adding per resource threadpool for resource " + resourceName
+            + " with size " + threadpoolSize);
+      }
+      _resourceThreadpoolSizeMap.put(resourceName, threadpoolSize);
+    }
+  }
+
+  /**
+   * Find the executor service for the message. A message can have a per-statemodelfactory
+   * executor service, or per-message type executor service.
+   * 
+   **/
+  ExecutorService findExecutorServiceForMsg(Message message)
+  {
+    ExecutorService executorService = _threadpoolMap.get(message.getMsgType());
+    if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
+    {
+      String resourceName = message.getResourceName();
+      if (resourceName != null)
+      {
+        String key = message.getMsgType() + "." + resourceName;
+        if (_threadpoolMap.containsKey(key))
+        {
+          LOG.info("Find per-resource thread pool with key " + key);
+          executorService = _threadpoolMap.get(key);
+        }
+      }
+    }
+    return executorService;
+  }
+
+  public void scheduleTask(Message message,
+                           MessageHandler handler,
+                           NotificationContext notificationContext)
+  {
+    assert (handler != null);
+    synchronized (_lock)
+    {
+      try
+      {
+        String taskId = message.getMsgId() + "/" + message.getPartitionName();
+
+        if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
+        {
+          checkResourceConfig(message.getResourceName(), notificationContext.getManager());
+        }
+        LOG.info("Scheduling message: " + taskId);
+        // System.out.println("sched msg: " + message.getPartitionName() + "-"
+        // + message.getTgtName() + "-" + message.getFromState() + "-"
+        // + message.getToState());
+
+        _statusUpdateUtil.logInfo(message,
+                                  HelixTaskExecutor.class,
+                                  "Message handling task scheduled",
+                                  notificationContext.getManager().getHelixDataAccessor());
+
+        HelixTask task = new HelixTask(message, notificationContext, handler, this);
+        if (!_taskMap.containsKey(taskId))
+        {
+          LOG.info("Message:" + taskId + " handling task scheduled");
+          Future<HelixTaskResult> future =
+              findExecutorServiceForMsg(message).submit(task);
+          _taskMap.put(taskId, future);
+        }
+        else
+        {
+          _statusUpdateUtil.logWarning(message,
+                                       HelixTaskExecutor.class,
+                                       "Message handling task already sheduled for "
+                                           + taskId,
+                                       notificationContext.getManager()
+                                                          .getHelixDataAccessor());
+        }
+      }
+      catch (Exception e)
+      {
+        LOG.error("Error while executing task." + message, e);
+
+        _statusUpdateUtil.logError(message,
+                                   HelixTaskExecutor.class,
+                                   e,
+                                   "Error while executing task " + e,
+                                   notificationContext.getManager()
+                                                      .getHelixDataAccessor());
+      }
+    }
+  }
+
+  public void cancelTask(Message message, NotificationContext notificationContext)
+  {
+    synchronized (_lock)
+    {
+      String taskId = message.getMsgId() + "/" + message.getPartitionName();
+
+      if (_taskMap.containsKey(taskId))
+      {
+        _statusUpdateUtil.logInfo(message,
+                                  HelixTaskExecutor.class,
+                                  "Trying to cancel the future for " + taskId,
+                                  notificationContext.getManager().getHelixDataAccessor());
+        Future<HelixTaskResult> future = _taskMap.get(taskId);
+
+        // If the thread is still running it will be interrupted if cancel(true)
+        // is called. So state transition callbacks should implement logic to
+        // return
+        // if it is interrupted.
+        if (future.cancel(true))
+        {
+          _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled "
+              + taskId, notificationContext.getManager().getHelixDataAccessor());
+          _taskMap.remove(taskId);
+        }
+        else
+        {
+          _statusUpdateUtil.logInfo(message,
+                                    HelixTaskExecutor.class,
+                                    "false when trying to cancel the message " + taskId,
+                                    notificationContext.getManager()
+                                                       .getHelixDataAccessor());
+        }
+      }
+      else
+      {
+        _statusUpdateUtil.logWarning(message,
+                                     HelixTaskExecutor.class,
+                                     "Future not found when trying to cancel " + taskId,
+                                     notificationContext.getManager()
+                                                        .getHelixDataAccessor());
+      }
+    }
+  }
+
+  protected void reportCompletion(Message message) // String msgId)
+  {
+    synchronized (_lock)
+    {
+      String taskId = message.getMsgId() + "/" + message.getPartitionName();
+      LOG.info("message finished: " + taskId + ", took "
+          + (new Date().getTime() - message.getExecuteStartTimeStamp()));
+      if (_taskMap.containsKey(taskId))
+      {
+        _taskMap.remove(taskId);
+      }
+      else
+      {
+        LOG.warn("message " + taskId + "not found in task map");
+      }
+    }
+  }
+
+  private void updateMessageState(List<Message> readMsgs,
+                                  HelixDataAccessor accessor,
+                                  String instanceName)
+  {
+    Builder keyBuilder = accessor.keyBuilder();
+    List<PropertyKey> readMsgKeys = new ArrayList<PropertyKey>();
+    for (Message msg : readMsgs)
+    {
+      readMsgKeys.add(msg.getKey(keyBuilder, instanceName));
+    }
+    accessor.setChildren(readMsgKeys, readMsgs);
+  }
+
+  @Override
+  public void onMessage(String instanceName,
+                        List<Message> messages,
+                        NotificationContext changeContext)
+  {
+    // If FINALIZE notification comes, reset all handler factories
+    // and terminate all the thread pools
+    // TODO: see if we should have a separate notification call for resetting
+    if (changeContext.getType() == Type.FINALIZE)
+    {
+      LOG.info("Get FINALIZE notification");
+      for (MessageHandlerFactory factory : _handlerFactoryMap.values())
+      {
+        factory.reset();
+      }
+      // Cancel all scheduled future
+      // synchronized (_lock)
+      {
+        for (Future<HelixTaskResult> f : _taskMap.values())
+        {
+          f.cancel(true);
+        }
+        _taskMap.clear();
+      }
+      return;
+    }
+
+    HelixManager manager = changeContext.getManager();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    if (messages == null || messages.size() == 0)
+    {
+      LOG.info("No Messages to process");
+      return;
+    }
+
+    // sort message by creation timestamp, so message created earlier is processed first
+    Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
+
+    // message handlers created
+    List<MessageHandler> handlers = new ArrayList<MessageHandler>();
+
+    // message read
+    List<Message> readMsgs = new ArrayList<Message>();
+
+    String sessionId = manager.getSessionId();
+    List<String> curResourceNames =
+        accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
+    List<PropertyKey> createCurStateKeys = new ArrayList<PropertyKey>();
+    List<CurrentState> metaCurStates = new ArrayList<CurrentState>();
+    Set<String> createCurStateNames = new HashSet<String>();
+
+    changeContext.add(NotificationContext.TASK_EXECUTOR_KEY, this);
+    for (Message message : messages)
+    {
+      // nop messages are simply removed. It is used to trigger onMessage() in
+      // situations such as register a new message handler factory
+      if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString()))
+      {
+        LOG.info("Dropping NO-OP message. mid: " + message.getId() + ", from: "
+            + message.getMsgSrc());
+        accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+        continue;
+      }
+
+      String tgtSessionId = message.getTgtSessionId();
+
+      // if sessionId not match, remove it
+      if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*"))
+      {
+        String warningMessage =
+            "SessionId does NOT match. expected sessionId: " + sessionId
+                + ", tgtSessionId in message: " + tgtSessionId + ", messageId: "
+                + message.getMsgId();
+        LOG.warn(warningMessage);
+        accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+        _statusUpdateUtil.logWarning(message,
+                                     HelixStateMachineEngine.class,
+                                     warningMessage,
+                                     accessor);
+        continue;
+      }
+
+      // don't process message that is of READ or UNPROCESSABLE state
+      if (MessageState.NEW != message.getMsgState())
+      {
+        // It happens because we don't delete message right after
+        // read. Instead we keep it until the current state is updated.
+        // We will read the message again if there is a new message but we
+        // check for the status and ignore if its already read
+        LOG.trace("Message already read. mid: " + message.getMsgId());
+        continue;
+      }
+
+      // create message handlers, if handlers not found, leave its state as NEW
+      try
+      {
+        List<MessageHandler> createHandlers =
+            createMessageHandlers(message, changeContext);
+        if (createHandlers.isEmpty())
+        {
+          continue;
+        }
+        handlers.addAll(createHandlers);
+      }
+      catch (Exception e)
+      {
+        LOG.error("Failed to create message handler for " + message.getMsgId(), e);
+        String error =
+            "Failed to create message handler for " + message.getMsgId()
+                + ", exception: " + e;
+
+        _statusUpdateUtil.logError(message,
+                                   HelixStateMachineEngine.class,
+                                   e,
+                                   error,
+                                   accessor);
+
+        // Mark message state UNPROCESSABLE if we hit an exception in creating
+        // message handler. The message will stay on zookeeper but will not be processed
+        message.setMsgState(MessageState.UNPROCESSABLE);
+        accessor.updateProperty(message.getKey(keyBuilder, instanceName), message);
+        continue;
+      }
+
+      // update msgState to read
+      message.setMsgState(MessageState.READ);
+      message.setReadTimeStamp(new Date().getTime());
+      message.setExecuteSessionId(changeContext.getManager().getSessionId());
+
+      _statusUpdateUtil.logInfo(message,
+                                HelixStateMachineEngine.class,
+                                "New Message",
+                                accessor);
+
+      readMsgs.add(message);
+
+      // batch creation of all current state meta data
+      // do it for non-controller and state transition messages only
+      if (!message.isControlerMsg()
+          && message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString()))
+      {
+        String resourceName = message.getResourceName();
+        if (!curResourceNames.contains(resourceName)
+            && !createCurStateNames.contains(resourceName))
+        {
+          createCurStateNames.add(resourceName);
+          createCurStateKeys.add(keyBuilder.currentState(instanceName,
+                                                         sessionId,
+                                                         resourceName));
+
+          CurrentState metaCurState = new CurrentState(resourceName);
+          metaCurState.setBucketSize(message.getBucketSize());
+          metaCurState.setStateModelDefRef(message.getStateModelDef());
+          metaCurState.setSessionId(sessionId);
+          metaCurState.setGroupMessageMode(message.getGroupMessageMode());
+          String ftyName = message.getStateModelFactoryName();
+          if (ftyName != null)
+          {
+            metaCurState.setStateModelFactoryName(ftyName);
+          }
+          else
+          {
+            metaCurState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+          }
+
+          metaCurStates.add(metaCurState);
+        }
+      }
+    }
+
+    // batch create curState meta
+    if (createCurStateKeys.size() > 0)
+    {
+      try
+      {
+        accessor.createChildren(createCurStateKeys, metaCurStates);
+      }
+      catch (Exception e)
+      {
+        LOG.error(e);
+      }
+    }
+
+    // update message state to READ in batch and schedule all read messages
+    if (readMsgs.size() > 0)
+    {
+      updateMessageState(readMsgs, accessor, instanceName);
+
+      for (MessageHandler handler : handlers)
+      {
+        scheduleTask(handler._message, handler, changeContext);
+      }
+    }
+  }
+
+  private MessageHandler createMessageHandler(Message message,
+                                              NotificationContext changeContext)
+  {
+    String msgType = message.getMsgType().toString();
+
+    MessageHandlerFactory handlerFactory = _handlerFactoryMap.get(msgType);
+
+    // Fail to find a MessageHandlerFactory for the message
+    // we will keep the message and the message will be handled when
+    // the corresponding MessageHandlerFactory is registered
+    if (handlerFactory == null)
+    {
+      LOG.warn("Fail to find message handler factory for type: " + msgType + " mid:"
+          + message.getMsgId());
+      return null;
+    }
+
+    return handlerFactory.createHandler(message, changeContext);
+  }
+
+  private List<MessageHandler> createMessageHandlers(Message message,
+                                                     NotificationContext changeContext)
+  {
+    List<MessageHandler> handlers = new ArrayList<MessageHandler>();
+    if (!message.getGroupMessageMode())
+    {
+      LOG.info("Creating handler for message " + message.getMsgId() + "/"
+          + message.getPartitionName());
+
+      MessageHandler handler = createMessageHandler(message, changeContext);
+
+      if (handler != null)
+      {
+        handlers.add(handler);
+      }
+    }
+    else
+    {
+      _groupMsgHandler.put(message);
+
+      List<String> partitionNames = message.getPartitionNames();
+      for (String partitionName : partitionNames)
+      {
+        Message subMsg = new Message(message.getRecord());
+        subMsg.setPartitionName(partitionName);
+        subMsg.setAttribute(Attributes.PARENT_MSG_ID, message.getId());
+
+        LOG.info("Creating handler for group message " + subMsg.getMsgId() + "/"
+            + partitionName);
+        MessageHandler handler = createMessageHandler(subMsg, changeContext);
+        if (handler != null)
+        {
+          handlers.add(handler);
+        }
+      }
+    }
+
+    return handlers;
+  }
+
+  public void shutDown()
+  {
+    LOG.info("shutting down TaskExecutor");
+    synchronized (_lock)
+    {
+      for (String msgType : _threadpoolMap.keySet())
+      {
+        List<Runnable> tasksLeft = _threadpoolMap.get(msgType).shutdownNow();
+        LOG.info(tasksLeft.size() + " tasks are still in the threadpool for msgType "
+            + msgType);
+      }
+      for (String msgType : _threadpoolMap.keySet())
+      {
+        try
+        {
+          if (!_threadpoolMap.get(msgType).awaitTermination(200, TimeUnit.MILLISECONDS))
+          {
+            LOG.warn(msgType + " is not fully termimated in 200 MS");
+            System.out.println(msgType + " is not fully termimated in 200 MS");
+          }
+        }
+        catch (InterruptedException e)
+        {
+          LOG.error("Interrupted", e);
+        }
+      }
+    }
+    _monitor.shutDown();
+    LOG.info("shutdown finished");
+  }
+
+  // TODO: remove this
+  public static void main(String[] args) throws Exception
+  {
+    ExecutorService pool = Executors.newFixedThreadPool(DEFAULT_PARALLEL_TASKS);
+    Future<HelixTaskResult> future;
+    future = pool.submit(new Callable<HelixTaskResult>()
+    {
+
+      @Override
+      public HelixTaskResult call() throws Exception
+      {
+        System.out.println("CMTaskExecutor.main(...).new Callable() {...}.call()");
+        return null;
+      }
+
+    });
+    future = pool.submit(new HelixTask(null, null, null, null));
+    Thread.currentThread().join();
+    System.out.println(future.isDone());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
new file mode 100644
index 0000000..ee6919e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class HelixTaskResult
+{
+
+  private boolean _success;
+  private String _message = "";
+  private Map<String, String> _taskResultMap = new HashMap<String, String>();
+  private boolean _interrupted = false;
+  Exception _exception = null;
+  
+  public boolean isSucess()
+  {
+    return _success;
+  }
+  
+  public boolean isInterrupted()
+  {
+    return _interrupted; 
+  }
+  
+  public void setInterrupted(boolean interrupted)
+  {
+    _interrupted = interrupted;
+  }
+  
+  public void setSuccess(boolean success)
+  {
+    this._success = success;
+  }
+
+  public String getMessage()
+  {
+    return _message;
+  }
+
+  public void setMessage(String message)
+  {
+    this._message = message;
+  }
+  
+  public Map<String, String> getTaskResultMap()
+  {
+    return _taskResultMap;
+  }
+  
+  public void setException(Exception e)
+  {
+    _exception = e;
+  }
+  
+  public Exception getException()
+  {
+    return _exception;
+  }
+}