You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC

[19/47] Refactoring from com.linkedin.helix to org.apache.helix

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/package-info.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/package-info.java
deleted file mode 100644
index bb5e7f8..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * zookeeper-based implementation of Helix cluster manager
- * 
- */
-package com.linkedin.helix.manager.zk;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/AsyncCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/AsyncCallback.java b/helix-core/src/main/java/com/linkedin/helix/messaging/AsyncCallback.java
deleted file mode 100644
index 330ec57..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/AsyncCallback.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.model.Message;
-
-public abstract class AsyncCallback
-{
-
-  private static Logger _logger = Logger.getLogger(AsyncCallback.class);
-  long _startTimeStamp = 0;
-  protected long _timeout = -1;
-  Timer _timer = null;
-  List<Message> _messagesSent;
-  protected final List<Message> _messageReplied = new ArrayList<Message>();
-  boolean _timedOut = false;
-  boolean _isInterrupted = false;
-
-  /**
-   * Enforcing timeout to be set
-   * 
-   * @param timeout
-   */
-  public AsyncCallback(long timeout)
-  {
-    _logger.info("Setting time out to " + timeout + " ms");
-    _timeout = timeout;
-  }
-  
-  public AsyncCallback()
-  {
-    this(-1);
-  }
-
-  public final void setTimeout(long timeout)
-  {
-    _logger.info("Setting time out to " + timeout + " ms");
-    _timeout = timeout;
-
-  }
-
-  public List<Message> getMessageReplied()
-  {
-    return _messageReplied;
-  }
-
-  public boolean isInterrupted()
-  {
-    return _isInterrupted;
-  }
-
-  public void setInterrupted(boolean b)
-  {
-    _isInterrupted = true;
-  }
-
-  public synchronized final void onReply(Message message)
-  {
-    _logger.info("OnReply msg " + message.getMsgId());
-    if (!isDone())
-    {
-      _messageReplied.add(message);
-      try
-      {
-        onReplyMessage(message);
-      }
-      catch(Exception e) 
-      {
-        _logger.error(e);
-      }
-    }
-    if (isDone())
-    {
-      if(_timer != null)
-      {
-        _timer.cancel();
-      }
-      notifyAll();
-    }
-  }
-
-  /**
-   * Default implementation will wait until every message sent gets a response
-   * 
-   * @return
-   */
-  public boolean isDone()
-  {
-    return _messageReplied.size() == _messagesSent.size();
-  }
-
-  public boolean isTimedOut()
-  {
-    return _timedOut;
-  }
-
-  final void setMessagesSent(List<Message> generatedMessage)
-  {
-    _messagesSent = generatedMessage;
-  }
-  
-  final void startTimer()
-  {
-    if (_timer == null && _timeout > 0)
-    {
-      if (_startTimeStamp == 0)
-      {
-        _startTimeStamp = new Date().getTime();
-      }
-      _timer = new Timer(true);
-      _timer.schedule(new TimeoutTask(this), _timeout);
-    }  
-  }
-  
-  public abstract void onTimeOut();
-
-  public abstract void onReplyMessage(Message message);
-
-  class TimeoutTask extends TimerTask
-  {
-    AsyncCallback _callback;
-
-    public TimeoutTask(AsyncCallback asyncCallback)
-    {
-      _callback = asyncCallback;
-    }
-
-    @Override
-    public void run()
-    {
-      try
-      {
-        synchronized (_callback)
-        {
-          _callback._timedOut = true;
-          _callback.notifyAll();
-          _callback.onTimeOut();
-        }
-      } 
-      catch (Exception e)
-      {
-        _logger.error(e);
-      }
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/CriteriaEvaluator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/CriteriaEvaluator.java b/helix-core/src/main/java/com/linkedin/helix/messaging/CriteriaEvaluator.java
deleted file mode 100644
index 6de0d64..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/CriteriaEvaluator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-import org.josql.Query;
-import org.josql.QueryExecutionException;
-import org.josql.QueryParseException;
-import org.josql.QueryResults;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.Criteria.DataSource;
-import com.linkedin.helix.josql.ClusterJosqlQueryProcessor;
-import com.linkedin.helix.josql.ZNRecordRow;
-
-public class CriteriaEvaluator
-{
-  private static Logger logger = Logger.getLogger(CriteriaEvaluator.class);
-  
-  public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria, HelixManager manager)
-  {
-    List<Map<String, String>> selected = new ArrayList<Map<String, String>>();
-    
-    String queryFields = 
-        (!recipientCriteria.getInstanceName().equals("")  ? " " + ZNRecordRow.MAP_SUBKEY  : " ''") +","+
-        (!recipientCriteria.getResource().equals("") ? " " + ZNRecordRow.ZNRECORD_ID : " ''") +","+
-        (!recipientCriteria.getPartition().equals("")   ? " " + ZNRecordRow.MAP_KEY   : " ''") +","+
-        (!recipientCriteria.getPartitionState().equals("") ? " " + ZNRecordRow.MAP_VALUE : " '' ");
-    
-    String matchCondition = 
-        ZNRecordRow.MAP_SUBKEY   + " LIKE '" + (!recipientCriteria.getInstanceName().equals("") ? (recipientCriteria.getInstanceName() +"'") :   "%' ") + " AND "+
-        ZNRecordRow.ZNRECORD_ID+ " LIKE '" + (!recipientCriteria.getResource().equals("") ? (recipientCriteria.getResource() +"'") : "%' ") + " AND "+
-        ZNRecordRow.MAP_KEY   + " LIKE '" + (!recipientCriteria.getPartition().equals("")   ? (recipientCriteria.getPartition()  +"'") :  "%' ") + " AND "+
-        ZNRecordRow.MAP_VALUE  + " LIKE '" + (!recipientCriteria.getPartitionState().equals("") ? (recipientCriteria.getPartitionState()+"'") :  "%' ") + " AND "+
-        ZNRecordRow.MAP_SUBKEY   + " IN ((SELECT [*]id FROM :LIVEINSTANCES))";
-        
-    
-    String queryTarget = recipientCriteria.getDataSource().toString() + ClusterJosqlQueryProcessor.FLATTABLE;
-    
-    String josql = "SELECT DISTINCT " + queryFields
-                 + " FROM " + queryTarget + " WHERE "
-                 + matchCondition;
-    ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
-    List<Object> result = new ArrayList<Object>();
-    try
-    {
-      logger.info("JOSQL query: " + josql);
-      result = p.runJoSqlQuery(josql, null, null);
-    } 
-    catch (Exception e)
-    {
-      logger.error("", e);
-      return selected;
-    } 
-    
-    for(Object o : result)
-    {
-      Map<String, String> resultRow = new HashMap<String, String>();
-      List<Object> row = (List<Object>)o;
-      resultRow.put("instanceName", (String)(row.get(0)));
-      resultRow.put("resourceName", (String)(row.get(1)));
-      resultRow.put("partitionName", (String)(row.get(2)));
-      resultRow.put("partitionState", (String)(row.get(3)));
-      selected.add(resultRow);
-    }
-    logger.info("JOSQL query return " + selected.size() + " rows");
-    return selected;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/com/linkedin/helix/messaging/DefaultMessagingService.java
deleted file mode 100644
index d98b3a4..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/DefaultMessagingService.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ClusterMessagingService;
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.messaging.handling.AsyncCallbackService;
-import com.linkedin.helix.messaging.handling.HelixTaskExecutor;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-
-public class DefaultMessagingService implements ClusterMessagingService
-{
-  private final HelixManager         _manager;
-  private final CriteriaEvaluator    _evaluator;
-  private final HelixTaskExecutor    _taskExecutor;
-  // TODO:rename to factory, this is not a service
-  private final AsyncCallbackService _asyncCallbackService;
-  private static Logger              _logger =
-                                                 Logger.getLogger(DefaultMessagingService.class);
-  ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded
-    = new ConcurrentHashMap<String, MessageHandlerFactory>();
-  
-  public DefaultMessagingService(HelixManager manager)
-  {
-    _manager = manager;
-    _evaluator = new CriteriaEvaluator();
-    _taskExecutor = new HelixTaskExecutor();
-    _asyncCallbackService = new AsyncCallbackService();
-    _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
-                                                _asyncCallbackService);
-  }
-
-  @Override
-  public int send(Criteria recipientCriteria, final Message messageTemplate)
-  {
-    return send(recipientCriteria, messageTemplate, null, -1);
-  }
-
-  @Override
-  public int send(final Criteria recipientCriteria,
-                  final Message message,
-                  AsyncCallback callbackOnReply,
-                  int timeOut)
-  {
-    return send(recipientCriteria, message, callbackOnReply, timeOut, 0);
-  }
-
-  @Override
-  public int send(final Criteria recipientCriteria,
-                  final Message message,
-                  AsyncCallback callbackOnReply,
-                  int timeOut,
-                  int retryCount)
-  {
-    Map<InstanceType, List<Message>> generateMessage =
-        generateMessage(recipientCriteria, message);
-    int totalMessageCount = 0;
-    for (List<Message> messages : generateMessage.values())
-    {
-      totalMessageCount += messages.size();
-    }
-    _logger.info("Send " + totalMessageCount + " messages with criteria "
-        + recipientCriteria);
-    if (totalMessageCount == 0)
-    {
-      return 0;
-    }
-    String correlationId = null;
-    if (callbackOnReply != null)
-    {
-      int totalTimeout = timeOut * (retryCount + 1);
-      if (totalTimeout < 0)
-      {
-        totalTimeout = -1;
-      }
-      callbackOnReply.setTimeout(totalTimeout);
-      correlationId = UUID.randomUUID().toString();
-      for (List<Message> messages : generateMessage.values())
-      {
-        callbackOnReply.setMessagesSent(messages);
-      }
-      _asyncCallbackService.registerAsyncCallback(correlationId, callbackOnReply);
-    }
-
-    for (InstanceType receiverType : generateMessage.keySet())
-    {
-      List<Message> list = generateMessage.get(receiverType);
-      for (Message tempMessage : list)
-      {
-        tempMessage.setRetryCount(retryCount);
-        tempMessage.setExecutionTimeout(timeOut);
-        tempMessage.setSrcInstanceType(_manager.getInstanceType());
-        if (correlationId != null)
-        {
-          tempMessage.setCorrelationId(correlationId);
-        }
-
-        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-        Builder keyBuilder = accessor.keyBuilder();
-
-        if (receiverType == InstanceType.CONTROLLER)
-        {
-          // _manager.getDataAccessor().setProperty(PropertyType.MESSAGES_CONTROLLER,
-          // tempMessage,
-          // tempMessage.getId());
-          accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()),
-                               tempMessage);
-        }
-
-        if (receiverType == InstanceType.PARTICIPANT)
-        {
-          accessor.setProperty(keyBuilder.message(tempMessage.getTgtName(),
-                                                  tempMessage.getId()),
-                               tempMessage);
-        }
-      }
-    }
-
-    if (callbackOnReply != null)
-    {
-      // start timer if timeout is set
-      callbackOnReply.startTimer();
-    }
-    return totalMessageCount;
-  }
-
-  private Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
-                                                           final Message message)
-  {
-    Map<InstanceType, List<Message>> messagesToSendMap =
-        new HashMap<InstanceType, List<Message>>();
-    InstanceType instanceType = recipientCriteria.getRecipientInstanceType();
-
-    if (instanceType == InstanceType.CONTROLLER)
-    {
-      List<Message> messages = generateMessagesForController(message);
-      messagesToSendMap.put(InstanceType.CONTROLLER, messages);
-      // _dataAccessor.setControllerProperty(PropertyType.MESSAGES,
-      // newMessage.getRecord(), CreateMode.PERSISTENT);
-    }
-    else if (instanceType == InstanceType.PARTICIPANT)
-    {
-      List<Message> messages = new ArrayList<Message>();
-      List<Map<String, String>> matchedList =
-          _evaluator.evaluateCriteria(recipientCriteria, _manager);
-
-      if (!matchedList.isEmpty())
-      {
-        Map<String, String> sessionIdMap = new HashMap<String, String>();
-        if (recipientCriteria.isSessionSpecific())
-        {
-          HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-          Builder keyBuilder = accessor.keyBuilder();
-
-          List<LiveInstance> liveInstances =
-              accessor.getChildValues(keyBuilder.liveInstances());
-
-          for (LiveInstance liveInstance : liveInstances)
-          {
-            sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
-          }
-        }
-        for (Map<String, String> map : matchedList)
-        {
-          String id = UUID.randomUUID().toString();
-          Message newMessage = new Message(message.getRecord(), id);
-          String srcInstanceName = _manager.getInstanceName();
-          String tgtInstanceName = map.get("instanceName");
-          // Don't send message to self
-          if (recipientCriteria.isSelfExcluded()
-              && srcInstanceName.equalsIgnoreCase(tgtInstanceName))
-          {
-            continue;
-          }
-          newMessage.setSrcName(srcInstanceName);
-          newMessage.setTgtName(tgtInstanceName);
-          newMessage.setResourceName(map.get("resourceName"));
-          newMessage.setPartitionName(map.get("partitionName"));
-          if (recipientCriteria.isSessionSpecific())
-          {
-            newMessage.setTgtSessionId(sessionIdMap.get(tgtInstanceName));
-          }
-          messages.add(newMessage);
-        }
-        messagesToSendMap.put(InstanceType.PARTICIPANT, messages);
-      }
-    }
-    return messagesToSendMap;
-  }
-
-  private List<Message> generateMessagesForController(Message message)
-  {
-    List<Message> messages = new ArrayList<Message>();
-    String id = UUID.randomUUID().toString();
-    Message newMessage = new Message(message.getRecord(), id);
-    newMessage.setMsgId(id);
-    newMessage.setSrcName(_manager.getInstanceName());
-    newMessage.setTgtName("Controller");
-    messages.add(newMessage);
-    return messages;
-  }
-  
-  @Override
-  public synchronized void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
-  {
-    if (_manager.isConnected())
-    {
-      registerMessageHandlerFactoryInternal(type, factory);
-    }
-    else
-    {
-      _messageHandlerFactoriestobeAdded.put(type, factory);
-    }
-  }
-  
-  public synchronized void onConnected()
-  {
-    for(String type : _messageHandlerFactoriestobeAdded.keySet())
-    {
-      registerMessageHandlerFactoryInternal(type, _messageHandlerFactoriestobeAdded.get(type));
-    }
-    _messageHandlerFactoriestobeAdded.clear();
-  }
-  
-  void registerMessageHandlerFactoryInternal(String type, MessageHandlerFactory factory)
-  {
-    _logger.info("registering msg factory for type " + type);
-    int threadpoolSize = HelixTaskExecutor.DEFAULT_PARALLEL_TASKS;
-    String threadpoolSizeStr = null;
-    String key = type + "." + HelixTaskExecutor.MAX_THREADS;
-    
-    ConfigAccessor configAccessor = _manager.getConfigAccessor();
-    if(configAccessor != null)
-    {
-      ConfigScope scope = null;
-      
-      // Read the participant config and cluster config for the per-message type thread pool size.
-      // participant config will override the cluster config.
-      
-      if(_manager.getInstanceType() == InstanceType.PARTICIPANT || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
-      { 
-        scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).forParticipant(_manager.getInstanceName()).build();
-        threadpoolSizeStr = configAccessor.get(scope, key);
-      }
-      
-      if(threadpoolSizeStr == null)
-      {
-        scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
-        threadpoolSizeStr = configAccessor.get(scope, key);
-      }
-    }
-    
-    if(threadpoolSizeStr != null)
-    {
-      try
-      {
-        threadpoolSize = Integer.parseInt(threadpoolSizeStr);
-        if(threadpoolSize <= 0)
-        {
-          threadpoolSize = 1;
-        }
-      }
-      catch(Exception e)
-      {
-        _logger.error("", e);
-      }
-    }
-    
-    _taskExecutor.registerMessageHandlerFactory(type, factory, threadpoolSize);
-    // Self-send a no-op message, so that the onMessage() call will be invoked
-    // again, and
-    // we have a chance to process the message that we received with the new
-    // added MessageHandlerFactory
-    // before the factory is added.
-    sendNopMessage();
-  }
-
-  public void sendNopMessage()
-  {
-    if (_manager.isConnected())
-    {
-      try
-      {
-        Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
-        nopMsg.setSrcName(_manager.getInstanceName());
-
-        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-        Builder keyBuilder = accessor.keyBuilder();
-
-        if (_manager.getInstanceType() == InstanceType.CONTROLLER
-            || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
-        {
-          nopMsg.setTgtName("Controller");
-          accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
-        }
-
-        if (_manager.getInstanceType() == InstanceType.PARTICIPANT
-            || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
-        {
-          nopMsg.setTgtName(_manager.getInstanceName());
-          accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()),
-                               nopMsg);
-        }
-      }
-      catch (Exception e)
-      {
-        _logger.error(e);
-      }
-    }
-  }
-
-  public HelixTaskExecutor getExecutor()
-  {
-    return _taskExecutor;
-  }
-
-  @Override
-  public int sendAndWait(Criteria receipientCriteria,
-                         Message message,
-                         AsyncCallback asyncCallback,
-                         int timeOut,
-                         int retryCount)
-  {
-    int messagesSent =
-        send(receipientCriteria, message, asyncCallback, timeOut, retryCount);
-    if (messagesSent > 0)
-    {
-      while (!asyncCallback.isDone() && !asyncCallback.isTimedOut())
-      {
-        synchronized (asyncCallback)
-        {
-          try
-          {
-            asyncCallback.wait();
-          }
-          catch (InterruptedException e)
-          {
-            _logger.error(e);
-            asyncCallback.setInterrupted(true);
-            break;
-          }
-        }
-      }
-    }
-    else
-    {
-      _logger.warn("No messages sent. For Criteria:" + receipientCriteria);
-    }
-    return messagesSent;
-  }
-
-  @Override
-  public int sendAndWait(Criteria recipientCriteria,
-                         Message message,
-                         AsyncCallback asyncCallback,
-                         int timeOut)
-  {
-    return sendAndWait(recipientCriteria, message, asyncCallback, timeOut, 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/handling/AsyncCallbackService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/AsyncCallbackService.java b/helix-core/src/main/java/com/linkedin/helix/messaging/handling/AsyncCallbackService.java
deleted file mode 100644
index 078ce08..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/AsyncCallbackService.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging.handling;
-
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.messaging.AsyncCallback;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorCode;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorType;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.participant.HelixStateMachineEngine;
-
-public class AsyncCallbackService implements MessageHandlerFactory
-{
-  private final ConcurrentHashMap<String, AsyncCallback> _callbackMap = new ConcurrentHashMap<String, AsyncCallback>();
-  private static Logger _logger = Logger.getLogger(AsyncCallbackService.class);
-
-  public AsyncCallbackService()
-  {
-  }
-
-  public void registerAsyncCallback(String correlationId, AsyncCallback callback)
-  {
-    if (_callbackMap.containsKey(correlationId))
-    {
-      _logger.warn("correlation id " + correlationId + " already registered");
-    }
-    _logger.info("registering correlation id " + correlationId);
-    _callbackMap.put(correlationId, callback);
-  }
-
-  void verifyMessage(Message message)
-  {
-    if (!message.getMsgType().toString()
-        .equalsIgnoreCase(MessageType.TASK_REPLY.toString()))
-    {
-      String errorMsg = "Unexpected msg type for message " + message.getMsgId()
-          + " type:" + message.getMsgType() + " Expected : "
-          + MessageType.TASK_REPLY;
-      _logger.error(errorMsg);
-      throw new HelixException(errorMsg);
-    }
-    String correlationId = message.getCorrelationId();
-    if (correlationId == null)
-    {
-      String errorMsg = "Message " + message.getMsgId()
-          + " does not have correlation id";
-      _logger.error(errorMsg);
-      throw new HelixException(errorMsg);
-    }
-
-    if (!_callbackMap.containsKey(correlationId))
-    {
-      String errorMsg = "Message "
-          + message.getMsgId()
-          + " does not have correponding callback. Probably timed out already. Correlation id: "
-          + correlationId;
-      _logger.error(errorMsg);
-      throw new HelixException(errorMsg);
-    }
-    _logger.info("Verified reply message " + message.getMsgId()
-        + " correlation:" + correlationId);
-  }
-
-  @Override
-  public MessageHandler createHandler(Message message,
-      NotificationContext context)
-  {
-    verifyMessage(message);
-    return new AsyncCallbackMessageHandler(message.getCorrelationId(),message, context);
-  }
-
-  @Override
-  public String getMessageType()
-  {
-    return MessageType.TASK_REPLY.toString();
-  }
-
-  @Override
-  public void reset()
-  {
-
-  }
-
-  public class AsyncCallbackMessageHandler extends MessageHandler
-  {
-    private final String _correlationId;
-
-    public AsyncCallbackMessageHandler(String correlationId, Message message, NotificationContext context)
-    {
-      super(message, context);
-      _correlationId = correlationId;
-    }
-
-    @Override
-    public HelixTaskResult handleMessage() throws InterruptedException
-    {
-      verifyMessage(_message);
-      HelixTaskResult result = new HelixTaskResult();
-      assert (_correlationId.equalsIgnoreCase(_message.getCorrelationId()));
-      _logger.info("invoking reply message " + _message.getMsgId()
-          + ", correlationid:" + _correlationId);
-
-      AsyncCallback callback = _callbackMap.get(_correlationId);
-      synchronized (callback)
-      {
-        callback.onReply(_message);
-        if (callback.isDone())
-        {
-          _logger.info("Removing finished callback, correlationid:"
-              + _correlationId);
-          _callbackMap.remove(_correlationId);
-        }
-      }
-      result.setSuccess(true);
-      return result;
-    }
-
-    @Override
-    public void onError(Exception e, ErrorCode code, ErrorType type)
-    {
-      _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/handling/GroupMessageHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/GroupMessageHandler.java b/helix-core/src/main/java/com/linkedin/helix/messaging/handling/GroupMessageHandler.java
deleted file mode 100644
index 4b2dfcd..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/GroupMessageHandler.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package com.linkedin.helix.messaging.handling;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.Attributes;
-
-public class GroupMessageHandler
-{
-  class CurrentStateUpdate
-  {
-    final PropertyKey  _key;
-    final CurrentState _curStateDelta;
-
-    public CurrentStateUpdate(PropertyKey key, CurrentState curStateDelta)
-    {
-      _key = key;
-      _curStateDelta = curStateDelta;
-    }
-
-    public void merge(CurrentState curState)
-    {
-      _curStateDelta.getRecord().merge(curState.getRecord());
-    }
-  }
-
-  static class GroupMessageInfo
-  {
-    final Message                                   _message;
-    final AtomicInteger                             _countDown;
-    final ConcurrentLinkedQueue<CurrentStateUpdate> _curStateUpdateList;
-
-    public GroupMessageInfo(Message message)
-    {
-      _message = message;
-      List<String> partitionNames = message.getPartitionNames();
-      _countDown = new AtomicInteger(partitionNames.size());
-      _curStateUpdateList = new ConcurrentLinkedQueue<CurrentStateUpdate>();
-    }
-    
-    public Map<PropertyKey, CurrentState> merge()
-    {
-      Map<String, CurrentStateUpdate> curStateUpdateMap =
-          new HashMap<String, CurrentStateUpdate>();
-      for (CurrentStateUpdate update : _curStateUpdateList)
-      {
-        String path = update._key.getPath();
-        if (!curStateUpdateMap.containsKey(path))
-        {
-          curStateUpdateMap.put(path, update);
-        }
-        else
-        {
-          curStateUpdateMap.get(path).merge(update._curStateDelta);
-        }
-      }
-
-      Map<PropertyKey, CurrentState> ret = new HashMap<PropertyKey, CurrentState>();
-      for (CurrentStateUpdate update : curStateUpdateMap.values())
-      {
-        ret.put(update._key, update._curStateDelta);
-      }
-
-      return ret;
-    }
- 
-  }
-
-  final ConcurrentHashMap<String, GroupMessageInfo> _groupMsgMap;
-
-  public GroupMessageHandler()
-  {
-    _groupMsgMap = new ConcurrentHashMap<String, GroupMessageInfo>();
-  }
-
-  public void put(Message message)
-  {
-    _groupMsgMap.putIfAbsent(message.getId(), new GroupMessageInfo(message));
-  }
-
-  // return non-null if all sub-messages are completed
-  public GroupMessageInfo onCompleteSubMessage(Message subMessage)
-  {
-    String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
-    GroupMessageInfo info = _groupMsgMap.get(parentMid);
-    if (info != null)
-    {
-      int val = info._countDown.decrementAndGet();
-      if (val <= 0)
-      {
-        return _groupMsgMap.remove(parentMid);
-      }
-    }
-
-    return null;
-  }
-
-  void addCurStateUpdate(Message subMessage, PropertyKey key, CurrentState delta)
-  {
-    String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
-    GroupMessageInfo info = _groupMsgMap.get(parentMid);
-    if (info != null)
-    {
-      info._curStateUpdateList.add(new CurrentStateUpdate(key, delta));
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixStateTransitionHandler.java
deleted file mode 100644
index 0df7e16..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixStateTransitionHandler.java
+++ /dev/null
@@ -1,388 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging.handling;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecordBucketizer;
-import com.linkedin.helix.ZNRecordDelta;
-import com.linkedin.helix.ZNRecordDelta.MergeOperation;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelParser;
-import com.linkedin.helix.participant.statemachine.StateTransitionError;
-import com.linkedin.helix.util.StatusUpdateUtil;
-
-public class HelixStateTransitionHandler extends MessageHandler
-{
-  public static class HelixStateMismatchException extends Exception
-  {
-    public HelixStateMismatchException(String info)
-    {
-      super(info);
-    }
-  }
-  private static Logger          logger     =
-                                                Logger.getLogger(HelixStateTransitionHandler.class);
-  private final StateModel       _stateModel;
-  StatusUpdateUtil               _statusUpdateUtil;
-  private final StateModelParser _transitionMethodFinder;
-  private final CurrentState     _currentStateDelta;
-  volatile boolean               _isTimeout = false;
-  private final HelixTaskExecutor              _executor;
-
-  public HelixStateTransitionHandler(StateModel stateModel,
-                                     Message message,
-                                     NotificationContext context,
-                                     CurrentState currentStateDelta,
-                                     HelixTaskExecutor executor)
-  {
-    super(message, context);
-    _stateModel = stateModel;
-    _statusUpdateUtil = new StatusUpdateUtil();
-    _transitionMethodFinder = new StateModelParser();
-    _currentStateDelta = currentStateDelta;
-    _executor = executor;
-  }
-
-  private void prepareMessageExecution(HelixManager manager, Message message) throws HelixException,
-      HelixStateMismatchException
-  {
-    if (!message.isValid())
-    {
-      String errorMessage =
-          "Invalid Message, ensure that message: " + message
-              + " has all the required fields: "
-              + Arrays.toString(Message.Attributes.values());
-
-      _statusUpdateUtil.logError(message,
-                                 HelixStateTransitionHandler.class,
-                                 errorMessage,
-                                 manager.getHelixDataAccessor());
-      logger.error(errorMessage);
-      throw new HelixException(errorMessage);
-    }
-    // DataAccessor accessor = manager.getDataAccessor();
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
-    String partitionName = message.getPartitionName();
-    String fromState = message.getFromState();
-
-    // Verify the fromState and current state of the stateModel
-    String state = _currentStateDelta.getState(partitionName);
-
-    if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state))
-    {
-      String errorMessage =
-          "Current state of stateModel does not match the fromState in Message"
-              + ", Current State:" + state + ", message expected:" + fromState
-              + ", partition: " + partitionName + ", from: " + message.getMsgSrc()
-              + ", to: " + message.getTgtName();
-
-      _statusUpdateUtil.logError(message,
-                                 HelixStateTransitionHandler.class,
-                                 errorMessage,
-                                 accessor);
-      logger.error(errorMessage);
-      throw new HelixStateMismatchException(errorMessage);
-    }
-  }
-
-  void postExecutionMessage(HelixManager manager,
-                            Message message,
-                            NotificationContext context,
-                            HelixTaskResult taskResult,
-                            Exception exception)
-  {
-    String partitionKey = message.getPartitionName();
-    String resource = message.getResourceName();
-    String sessionId = message.getTgtSessionId();
-    String instanceName = manager.getInstanceName();
-
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    int bucketSize = message.getBucketSize();
-    ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
-
-    // Lock the helix manager so that the session id will not change when we update
-    // the state model state. for zk current state it is OK as we have the per-session
-    // current state node
-    synchronized (manager)
-    {
-      if (!message.getTgtSessionId().equals(manager.getSessionId()))
-      {
-        logger.warn("Session id has changed. Skip postExecutionMessage. Old session "
-            + message.getExecutionSessionId() + " , new session : "
-            + manager.getSessionId());
-        return;
-      }
-
-      if (taskResult.isSucess())
-      {
-        // String fromState = message.getFromState();
-        String toState = message.getToState();
-        _currentStateDelta.setState(partitionKey, toState);
-
-        if (toState.equalsIgnoreCase("DROPPED"))
-        {
-          // for "OnOfflineToDROPPED" message, we need to remove the resource key record
-          // from the current state of the instance because the resource key is dropped.
-          // In the state model it will be stayed as "OFFLINE", which is OK.
-          ZNRecordDelta delta =
-              new ZNRecordDelta(_currentStateDelta.getRecord(), MergeOperation.SUBTRACT);
-          // Don't subtract simple fields since they contain stateModelDefRef
-          delta._record.getSimpleFields().clear();
-
-          List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
-          deltaList.add(delta);
-          _currentStateDelta.setDeltaList(deltaList);
-        }
-        else
-        {
-          // if the partition is not to be dropped, update _stateModel to the TO_STATE
-          _stateModel.updateState(toState);
-        }
-      }
-      else
-      {
-        if (exception instanceof HelixStateMismatchException)
-        {
-          // if fromState mismatch, set current state on zk to stateModel's current state
-          logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. partitionKey: "
-              + partitionKey
-              + ", currentState: "
-              + _stateModel.getCurrentState()
-              + ", message: " + message);
-          _currentStateDelta.setState(partitionKey, _stateModel.getCurrentState());
-        }
-        else
-        {
-          StateTransitionError error =
-              new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, exception);
-          if (exception instanceof InterruptedException)
-          {
-            if (_isTimeout)
-            {
-              error =
-                  new StateTransitionError(ErrorType.INTERNAL,
-                                           ErrorCode.TIMEOUT,
-                                           exception);
-            }
-            else
-            {
-              // State transition interrupted but not caused by timeout. Keep the current
-              // state in this case
-              logger.error("State transition interrupted but not timeout. Not updating state. Partition : "
-                  + message.getPartitionName() + " MsgId : " + message.getMsgId());
-              return;
-            }
-          }
-          _stateModel.rollbackOnError(message, context, error);
-          _currentStateDelta.setState(partitionKey, "ERROR");
-          _stateModel.updateState("ERROR");
-        }
-      }
-    }
-    try
-    {
-      // Update the ZK current state of the node
-      PropertyKey key = keyBuilder.currentState(instanceName,
-                              sessionId,
-                              resource,
-                              bucketizer.getBucketName(partitionKey));
-      if (!_message.getGroupMessageMode())
-      {
-        accessor.updateProperty(key, _currentStateDelta);
-      }
-      else
-      {
-        _executor._groupMsgHandler.addCurStateUpdate(_message, key, _currentStateDelta);
-      }
-    }
-    catch (Exception e)
-    {
-      logger.error("Error when updating the state ", e);
-      StateTransitionError error =
-          new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
-      _stateModel.rollbackOnError(message, context, error);
-      _statusUpdateUtil.logError(message,
-                                 HelixStateTransitionHandler.class,
-                                 e,
-                                 "Error when update the state ",
-                                 accessor);
-    }
-  }
-
-  public HelixTaskResult handleMessageInternal(Message message,
-                                               NotificationContext context)
-  {
-    synchronized (_stateModel)
-    {
-      HelixTaskResult taskResult = new HelixTaskResult();
-      HelixManager manager = context.getManager();
-      HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
-      _statusUpdateUtil.logInfo(message,
-                                HelixStateTransitionHandler.class,
-                                "Message handling task begin execute",
-                                accessor);
-      message.setExecuteStartTimeStamp(new Date().getTime());
-
-      Exception exception = null;
-      try
-      {
-        prepareMessageExecution(manager, message);
-        invoke(accessor, context, taskResult, message);
-      }
-      catch (HelixStateMismatchException e)
-      {
-        // Simply log error and return from here if State mismatch.
-        // The current state of the state model is intact.
-        taskResult.setSuccess(false);
-        taskResult.setMessage(e.toString());
-        taskResult.setException(e);
-        exception = e;
-        // return taskResult;
-      }
-      catch (Exception e)
-      {
-        String errorMessage =
-            "Exception while executing a state transition task "
-                + message.getPartitionName();
-        logger.error(errorMessage, e);
-        if (e.getCause() != null && e.getCause() instanceof InterruptedException)
-        {
-          e = (InterruptedException) e.getCause();
-        }
-        _statusUpdateUtil.logError(message,
-                                   HelixStateTransitionHandler.class,
-                                   e,
-                                   errorMessage,
-                                   accessor);
-        taskResult.setSuccess(false);
-        taskResult.setMessage(e.toString());
-        taskResult.setException(e);
-        taskResult.setInterrupted(e instanceof InterruptedException);
-        exception = e;
-      }
-      postExecutionMessage(manager, message, context, taskResult, exception);
-
-      return taskResult;
-    }
-  }
-
-  private void invoke(HelixDataAccessor accessor,
-                      NotificationContext context,
-                      HelixTaskResult taskResult,
-                      Message message) throws IllegalAccessException,
-      InvocationTargetException,
-      InterruptedException
-  {
-    _statusUpdateUtil.logInfo(message,
-                              HelixStateTransitionHandler.class,
-                              "Message handling invoking",
-                              accessor);
-
-    // by default, we invoke state transition function in state model
-    Method methodToInvoke = null;
-    String fromState = message.getFromState();
-    String toState = message.getToState();
-    methodToInvoke =
-        _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(),
-                                                       fromState,
-                                                       toState,
-                                                       new Class[] { Message.class,
-                                                           NotificationContext.class });
-    if (methodToInvoke != null)
-    {
-      methodToInvoke.invoke(_stateModel, new Object[] { message, context });
-      taskResult.setSuccess(true);
-    }
-    else
-    {
-      String errorMessage =
-          "Unable to find method for transition from " + fromState + " to " + toState
-              + "in " + _stateModel.getClass();
-      logger.error(errorMessage);
-      taskResult.setSuccess(false);
-
-      _statusUpdateUtil.logError(message,
-                                 HelixStateTransitionHandler.class,
-                                 errorMessage,
-                                 accessor);
-    }
-  }
-
-  @Override
-  public HelixTaskResult handleMessage()
-  {
-    return handleMessageInternal(_message, _notificationContext);
-  }
-
-  @Override
-  public void onError(Exception e, ErrorCode code, ErrorType type)
-  {
-    // All internal error has been processed already, so we can skip them
-    if (type == ErrorType.INTERNAL)
-    {
-      logger.error("Skip internal error " + e.getMessage() + " " + code);
-      return;
-    }
-    HelixManager manager = _notificationContext.getManager();
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    String instanceName = manager.getInstanceName();
-    String partition = _message.getPartitionName();
-    String resourceName = _message.getResourceName();
-    CurrentState currentStateDelta = new CurrentState(resourceName);
-
-    StateTransitionError error = new StateTransitionError(type, code, e);
-    _stateModel.rollbackOnError(_message, _notificationContext, error);
-    // if the transition is not canceled, it should go into error state
-    if (code == ErrorCode.ERROR)
-    {
-      currentStateDelta.setState(partition, "ERROR");
-      _stateModel.updateState("ERROR");
-
-      accessor.updateProperty(keyBuilder.currentState(instanceName,
-                                                      _message.getTgtSessionId(),
-                                                      resourceName),
-                              currentStateDelta);
-    }
-  }
-
-  @Override
-  public void onTimeout()
-  {
-    _isTimeout = true;
-  }
-};

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTask.java
deleted file mode 100644
index 73c622c..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTask.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging.handling;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.messaging.handling.GroupMessageHandler.GroupMessageInfo;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorCode;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorType;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.monitoring.StateTransitionContext;
-import com.linkedin.helix.monitoring.StateTransitionDataPoint;
-import com.linkedin.helix.util.StatusUpdateUtil;
-
-public class HelixTask implements Callable<HelixTaskResult>
-{
-  private static Logger             logger     = Logger.getLogger(HelixTask.class);
-  private final Message             _message;
-  private final MessageHandler      _handler;
-  private final NotificationContext _notificationContext;
-  private final HelixManager        _manager;
-  StatusUpdateUtil                  _statusUpdateUtil;
-  HelixTaskExecutor                 _executor;
-  volatile boolean                  _isTimeout = false;
-
-  public class TimeoutCancelTask extends TimerTask
-  {
-    HelixTaskExecutor   _executor;
-    Message             _message;
-    NotificationContext _context;
-
-    public TimeoutCancelTask(HelixTaskExecutor executor,
-                             Message message,
-                             NotificationContext context)
-    {
-      _executor = executor;
-      _message = message;
-      _context = context;
-    }
-
-    @Override
-    public void run()
-    {
-      _isTimeout = true;
-      logger.warn("Message time out, canceling. id:" + _message.getMsgId()
-          + " timeout : " + _message.getExecutionTimeout());
-      _handler.onTimeout();
-      _executor.cancelTask(_message, _context);
-    }
-
-  }
-
-  public HelixTask(Message message,
-                   NotificationContext notificationContext,
-                   MessageHandler handler,
-                   HelixTaskExecutor executor) throws Exception
-  {
-    this._notificationContext = notificationContext;
-    this._message = message;
-    this._handler = handler;
-    this._manager = notificationContext.getManager();
-    _statusUpdateUtil = new StatusUpdateUtil();
-    _executor = executor;
-  }
-
-  @Override
-  public HelixTaskResult call()
-  {
-    // Start the timeout TimerTask, if necessary
-    Timer timer = null;
-    if (_message.getExecutionTimeout() > 0)
-    {
-      timer = new Timer(true);
-      timer.schedule(new TimeoutCancelTask(_executor, _message, _notificationContext),
-                     _message.getExecutionTimeout());
-      logger.info("Message starts with timeout " + _message.getExecutionTimeout()
-          + " MsgId:" + _message.getMsgId());
-    }
-    else
-    {
-      logger.info("Message does not have timeout. MsgId:" + _message.getMsgId() + "/"
-          + _message.getPartitionName());
-    }
-
-    HelixTaskResult taskResult = new HelixTaskResult();
-
-    Exception exception = null;
-    ErrorType type = ErrorType.INTERNAL;
-    ErrorCode code = ErrorCode.ERROR;
-
-    long start = System.currentTimeMillis();
-    logger.info("msg:" + _message.getMsgId() + " handling task begin, at: " + start);
-    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-    _statusUpdateUtil.logInfo(_message,
-                              HelixTask.class,
-                              "Message handling task begin execute",
-                              accessor);
-    _message.setExecuteStartTimeStamp(new Date().getTime());
-
-    // Handle the message
-    try
-    {
-      taskResult = _handler.handleMessage();
-      exception = taskResult.getException();
-    }
-    catch (InterruptedException e)
-    {
-      _statusUpdateUtil.logError(_message,
-                                 HelixTask.class,
-                                 e,
-                                 "State transition interrupted, timeout:" + _isTimeout,
-                                 accessor);
-      logger.info("Message " + _message.getMsgId() + " is interrupted");
-      taskResult.setInterrupted(true);
-      taskResult.setException(e);
-      exception = e;
-    }
-    catch (Exception e)
-    {
-      String errorMessage =
-          "Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
-              + " type: " + _message.getMsgType();
-      logger.error(errorMessage, e);
-      _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
-      taskResult.setSuccess(false);
-      taskResult.setException(e);
-      taskResult.setMessage(e.getMessage());
-      exception = e;
-    }
-
-    // Cancel the timer since the handling is done
-    // it is fine if the TimerTask for canceling is called already
-    if (timer != null)
-    {
-      timer.cancel();
-    }
-
-    if (taskResult.isSucess())
-    {
-      _statusUpdateUtil.logInfo(_message,
-                                _handler.getClass(),
-                                "Message handling task completed successfully",
-                                accessor);
-      logger.info("Message " + _message.getMsgId() + " completed.");
-    }
-    else if (taskResult.isInterrupted())
-    {
-      logger.info("Message " + _message.getMsgId() + " is interrupted");
-      code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
-      if (_isTimeout)
-      {
-        int retryCount = _message.getRetryCount();
-        logger.info("Message timeout, retry count: " + retryCount + " MSGID:"
-            + _message.getMsgId());
-        _statusUpdateUtil.logInfo(_message,
-                                  _handler.getClass(),
-                                  "Message handling task timeout, retryCount:"
-                                      + retryCount,
-                                  accessor);
-        // Notify the handler that timeout happens, and the number of retries left
-        // In case timeout happens (time out and also interrupted)
-        // we should retry the execution of the message by re-schedule it in
-        if (retryCount > 0)
-        {
-          _message.setRetryCount(retryCount - 1);
-          _executor.scheduleTask(_message, _handler, _notificationContext);
-          return taskResult;
-        }
-      }
-    }
-    else
-    // logging for errors
-    {
-      String errorMsg =
-          "Message execution failed. msgId: " + _message.getMsgId()
-              + taskResult.getMessage();
-      if (exception != null)
-      {
-        errorMsg += exception;
-      }
-      logger.error(errorMsg, exception);
-      _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
-    }
-
-    // Post-processing for the finished task
-    try
-    {
-      if (!_message.getGroupMessageMode())
-      {
-        removeMessageFromZk(accessor, _message);
-        reportMessageStat(_manager, _message, taskResult);
-        sendReply(accessor, _message, taskResult);
-      }
-      else
-      {
-        GroupMessageInfo info = _executor._groupMsgHandler.onCompleteSubMessage(_message); 
-        if (info != null)
-        {
-          // TODO: changed to async update
-          // group update current state
-          Map<PropertyKey, CurrentState> curStateMap = info.merge();
-          for (PropertyKey key : curStateMap.keySet())
-          {
-            accessor.updateProperty(key, curStateMap.get(key));
-          }
-
-          // remove group message
-          removeMessageFromZk(accessor, _message);
-          reportMessageStat(_manager, _message, taskResult);
-          sendReply(accessor, _message, taskResult);
-        }
-      }
-      _executor.reportCompletion(_message);
-    }
-
-    // TODO: capture errors and log here
-    catch (Exception e)
-    {
-      String errorMessage =
-          "Exception after executing a message, msgId: " + _message.getMsgId() + e;
-      logger.error(errorMessage, e);
-      _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
-      exception = e;
-      type = ErrorType.FRAMEWORK;
-      code = ErrorCode.ERROR;
-    }
-    //
-    finally
-    {
-      long end = System.currentTimeMillis();
-      logger.info("msg:" + _message.getMsgId() + " handling task completed, results:"
-          + taskResult.isSucess() + ", at: " + end + ", took:" + (end - start));
-
-      // Notify the handler about any error happened in the handling procedure, so that
-      // the handler have chance to finally cleanup
-      if (exception != null)
-      {
-        _handler.onError(exception, code, type);
-      }
-    }
-    return taskResult;
-  }
-
-  private void removeMessageFromZk(HelixDataAccessor accessor, Message message)
-  {
-    Builder keyBuilder = accessor.keyBuilder();
-    if (message.getTgtName().equalsIgnoreCase("controller"))
-    {
-      // TODO: removeProperty returns boolean
-      accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId()));
-    }
-    else
-    {
-      accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(),
-                                                 message.getMsgId()));
-    }
-  }
-
-  private void sendReply(HelixDataAccessor accessor,
-                         Message message,
-                         HelixTaskResult taskResult)
-  {
-    if (_message.getCorrelationId() != null
-        && !message.getMsgType().equals(MessageType.TASK_REPLY.toString()))
-    {
-      logger.info("Sending reply for message " + message.getCorrelationId());
-      _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", accessor);
-
-      taskResult.getTaskResultMap().put("SUCCESS", "" + taskResult.isSucess());
-      taskResult.getTaskResultMap().put("INTERRUPTED", "" + taskResult.isInterrupted());
-      if (!taskResult.isSucess())
-      {
-        taskResult.getTaskResultMap().put("ERRORINFO", taskResult.getMessage());
-      }
-      Message replyMessage =
-          Message.createReplyMessage(_message,
-                                     _manager.getInstanceName(),
-                                     taskResult.getTaskResultMap());
-      replyMessage.setSrcInstanceType(_manager.getInstanceType());
-
-      if (message.getSrcInstanceType() == InstanceType.PARTICIPANT)
-      {
-        Builder keyBuilder = accessor.keyBuilder();
-        accessor.setProperty(keyBuilder.message(message.getMsgSrc(),
-                                                replyMessage.getMsgId()),
-                             replyMessage);
-      }
-      else if (message.getSrcInstanceType() == InstanceType.CONTROLLER)
-      {
-        Builder keyBuilder = accessor.keyBuilder();
-        accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()),
-                             replyMessage);
-      }
-      _statusUpdateUtil.logInfo(message, HelixTask.class, "1 msg replied to "
-          + replyMessage.getTgtName(), accessor);
-    }
-  }
-
-  private void reportMessageStat(HelixManager manager,
-                                 Message message,
-                                 HelixTaskResult taskResult)
-  {
-    // report stat
-    if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
-    {
-      return;
-    }
-    long now = new Date().getTime();
-    long msgReadTime = message.getReadTimeStamp();
-    long msgExecutionStartTime = message.getExecuteStartTimeStamp();
-    if (msgReadTime != 0 && msgExecutionStartTime != 0)
-    {
-      long totalDelay = now - msgReadTime;
-      long executionDelay = now - msgExecutionStartTime;
-      if (totalDelay > 0 && executionDelay > 0)
-      {
-        String fromState = message.getFromState();
-        String toState = message.getToState();
-        String transition = fromState + "--" + toState;
-
-        StateTransitionContext cxt =
-            new StateTransitionContext(manager.getClusterName(),
-                                       manager.getInstanceName(),
-                                       message.getResourceName(),
-                                       transition);
-
-        StateTransitionDataPoint data =
-            new StateTransitionDataPoint(totalDelay,
-                                         executionDelay,
-                                         taskResult.isSucess());
-        _executor.getParticipantMonitor().reportTransitionStat(cxt, data);
-      }
-    }
-    else
-    {
-      logger.warn("message read time and start execution time not recorded.");
-    }
-  }
-
-};

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTaskExecutor.java
deleted file mode 100644
index 84fa329..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTaskExecutor.java
+++ /dev/null
@@ -1,638 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging.handling;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.HelixConstants;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.MessageListener;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.NotificationContext.Type;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.Attributes;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.monitoring.ParticipantMonitor;
-import com.linkedin.helix.participant.HelixStateMachineEngine;
-import com.linkedin.helix.util.StatusUpdateUtil;
-
-public class HelixTaskExecutor implements MessageListener
-{
-  // TODO: we need to further design how to throttle this.
-  // From storage point of view, only bootstrap case is expensive
-  // and we need to throttle, which is mostly IO / network bounded.
-  public static final int                                DEFAULT_PARALLEL_TASKS     = 40;
-  // TODO: create per-task type threadpool with customizable pool size
-  protected final Map<String, Future<HelixTaskResult>>   _taskMap;
-  private final Object                                   _lock;
-  private final StatusUpdateUtil                         _statusUpdateUtil;
-  private final ParticipantMonitor                       _monitor;
-  public static final String                             MAX_THREADS                =
-                                                                                        "maxThreads";
-
-  final ConcurrentHashMap<String, MessageHandlerFactory> _handlerFactoryMap         =
-                                                                                        new ConcurrentHashMap<String, MessageHandlerFactory>();
-
-  final ConcurrentHashMap<String, ExecutorService>       _threadpoolMap             =
-                                                                                        new ConcurrentHashMap<String, ExecutorService>();
-
-  private static Logger                                  LOG                        =
-                                                                                        Logger.getLogger(HelixTaskExecutor.class);
-
-  Map<String, Integer>                                   _resourceThreadpoolSizeMap =
-                                                                                        new ConcurrentHashMap<String, Integer>();
-
-  final GroupMessageHandler                              _groupMsgHandler;
-
-  public HelixTaskExecutor()
-  {
-    _taskMap = new ConcurrentHashMap<String, Future<HelixTaskResult>>();
-    _groupMsgHandler = new GroupMessageHandler();
-
-    _lock = new Object();
-    _statusUpdateUtil = new StatusUpdateUtil();
-    _monitor = new ParticipantMonitor();
-    startMonitorThread();
-  }
-
-  public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
-  {
-    registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS);
-  }
-
-  public void registerMessageHandlerFactory(String type,
-                                            MessageHandlerFactory factory,
-                                            int threadpoolSize)
-  {
-    if (!_handlerFactoryMap.containsKey(type))
-    {
-      if (!type.equalsIgnoreCase(factory.getMessageType()))
-      {
-        throw new HelixException("Message factory type mismatch. Type: " + type
-            + " factory : " + factory.getMessageType());
-
-      }
-      _handlerFactoryMap.put(type, factory);
-      _threadpoolMap.put(type, Executors.newFixedThreadPool(threadpoolSize));
-      LOG.info("Adding msg factory for type " + type + " threadpool size "
-          + threadpoolSize);
-    }
-    else
-    {
-      LOG.error("Ignoring duplicate msg handler factory for type " + type);
-    }
-  }
-
-  public ParticipantMonitor getParticipantMonitor()
-  {
-    return _monitor;
-  }
-
-  private void startMonitorThread()
-  {
-    // start a thread which monitors the completions of task
-  }
-
-  void checkResourceConfig(String resourceName, HelixManager manager)
-  {
-    if (!_resourceThreadpoolSizeMap.containsKey(resourceName))
-    {
-      int threadpoolSize = -1;
-      ConfigAccessor configAccessor = manager.getConfigAccessor();
-      if (configAccessor != null)
-      {
-        ConfigScope scope =
-            new ConfigScopeBuilder().forCluster(manager.getClusterName())
-                                    .forResource(resourceName)
-                                    .build();
-
-        String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
-        try
-        {
-          if (threadpoolSizeStr != null)
-          {
-            threadpoolSize = Integer.parseInt(threadpoolSizeStr);
-          }
-        }
-        catch (Exception e)
-        {
-          LOG.error("", e);
-        }
-      }
-      if (threadpoolSize > 0)
-      {
-        String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName;
-        _threadpoolMap.put(key, Executors.newFixedThreadPool(threadpoolSize));
-        LOG.info("Adding per resource threadpool for resource " + resourceName
-            + " with size " + threadpoolSize);
-      }
-      _resourceThreadpoolSizeMap.put(resourceName, threadpoolSize);
-    }
-  }
-
-  /**
-   * Find the executor service for the message. A message can have a per-statemodelfactory
-   * executor service, or per-message type executor service.
-   * 
-   **/
-  ExecutorService findExecutorServiceForMsg(Message message)
-  {
-    ExecutorService executorService = _threadpoolMap.get(message.getMsgType());
-    if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
-    {
-      String resourceName = message.getResourceName();
-      if (resourceName != null)
-      {
-        String key = message.getMsgType() + "." + resourceName;
-        if (_threadpoolMap.containsKey(key))
-        {
-          LOG.info("Find per-resource thread pool with key " + key);
-          executorService = _threadpoolMap.get(key);
-        }
-      }
-    }
-    return executorService;
-  }
-
-  public void scheduleTask(Message message,
-                           MessageHandler handler,
-                           NotificationContext notificationContext)
-  {
-    assert (handler != null);
-    synchronized (_lock)
-    {
-      try
-      {
-        String taskId = message.getMsgId() + "/" + message.getPartitionName();
-
-        if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
-        {
-          checkResourceConfig(message.getResourceName(), notificationContext.getManager());
-        }
-        LOG.info("Scheduling message: " + taskId);
-        // System.out.println("sched msg: " + message.getPartitionName() + "-"
-        // + message.getTgtName() + "-" + message.getFromState() + "-"
-        // + message.getToState());
-
-        _statusUpdateUtil.logInfo(message,
-                                  HelixTaskExecutor.class,
-                                  "Message handling task scheduled",
-                                  notificationContext.getManager().getHelixDataAccessor());
-
-        HelixTask task = new HelixTask(message, notificationContext, handler, this);
-        if (!_taskMap.containsKey(taskId))
-        {
-          LOG.info("Message:" + taskId + " handling task scheduled");
-          Future<HelixTaskResult> future =
-              findExecutorServiceForMsg(message).submit(task);
-          _taskMap.put(taskId, future);
-        }
-        else
-        {
-          _statusUpdateUtil.logWarning(message,
-                                       HelixTaskExecutor.class,
-                                       "Message handling task already sheduled for "
-                                           + taskId,
-                                       notificationContext.getManager()
-                                                          .getHelixDataAccessor());
-        }
-      }
-      catch (Exception e)
-      {
-        LOG.error("Error while executing task." + message, e);
-
-        _statusUpdateUtil.logError(message,
-                                   HelixTaskExecutor.class,
-                                   e,
-                                   "Error while executing task " + e,
-                                   notificationContext.getManager()
-                                                      .getHelixDataAccessor());
-      }
-    }
-  }
-
-  public void cancelTask(Message message, NotificationContext notificationContext)
-  {
-    synchronized (_lock)
-    {
-      String taskId = message.getMsgId() + "/" + message.getPartitionName();
-
-      if (_taskMap.containsKey(taskId))
-      {
-        _statusUpdateUtil.logInfo(message,
-                                  HelixTaskExecutor.class,
-                                  "Trying to cancel the future for " + taskId,
-                                  notificationContext.getManager().getHelixDataAccessor());
-        Future<HelixTaskResult> future = _taskMap.get(taskId);
-
-        // If the thread is still running it will be interrupted if cancel(true)
-        // is called. So state transition callbacks should implement logic to
-        // return
-        // if it is interrupted.
-        if (future.cancel(true))
-        {
-          _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled "
-              + taskId, notificationContext.getManager().getHelixDataAccessor());
-          _taskMap.remove(taskId);
-        }
-        else
-        {
-          _statusUpdateUtil.logInfo(message,
-                                    HelixTaskExecutor.class,
-                                    "false when trying to cancel the message " + taskId,
-                                    notificationContext.getManager()
-                                                       .getHelixDataAccessor());
-        }
-      }
-      else
-      {
-        _statusUpdateUtil.logWarning(message,
-                                     HelixTaskExecutor.class,
-                                     "Future not found when trying to cancel " + taskId,
-                                     notificationContext.getManager()
-                                                        .getHelixDataAccessor());
-      }
-    }
-  }
-
-  protected void reportCompletion(Message message) // String msgId)
-  {
-    synchronized (_lock)
-    {
-      String taskId = message.getMsgId() + "/" + message.getPartitionName();
-      LOG.info("message finished: " + taskId + ", took "
-          + (new Date().getTime() - message.getExecuteStartTimeStamp()));
-      if (_taskMap.containsKey(taskId))
-      {
-        _taskMap.remove(taskId);
-      }
-      else
-      {
-        LOG.warn("message " + taskId + "not found in task map");
-      }
-    }
-  }
-
-  private void updateMessageState(List<Message> readMsgs,
-                                  HelixDataAccessor accessor,
-                                  String instanceName)
-  {
-    Builder keyBuilder = accessor.keyBuilder();
-    List<PropertyKey> readMsgKeys = new ArrayList<PropertyKey>();
-    for (Message msg : readMsgs)
-    {
-      readMsgKeys.add(msg.getKey(keyBuilder, instanceName));
-    }
-    accessor.setChildren(readMsgKeys, readMsgs);
-  }
-
-  @Override
-  public void onMessage(String instanceName,
-                        List<Message> messages,
-                        NotificationContext changeContext)
-  {
-    // If FINALIZE notification comes, reset all handler factories
-    // and terminate all the thread pools
-    // TODO: see if we should have a separate notification call for resetting
-    if (changeContext.getType() == Type.FINALIZE)
-    {
-      LOG.info("Get FINALIZE notification");
-      for (MessageHandlerFactory factory : _handlerFactoryMap.values())
-      {
-        factory.reset();
-      }
-      // Cancel all scheduled future
-      // synchronized (_lock)
-      {
-        for (Future<HelixTaskResult> f : _taskMap.values())
-        {
-          f.cancel(true);
-        }
-        _taskMap.clear();
-      }
-      return;
-    }
-
-    HelixManager manager = changeContext.getManager();
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
-
-    if (messages == null || messages.size() == 0)
-    {
-      LOG.info("No Messages to process");
-      return;
-    }
-
-    // sort message by creation timestamp, so message created earlier is processed first
-    Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
-
-    // message handlers created
-    List<MessageHandler> handlers = new ArrayList<MessageHandler>();
-
-    // message read
-    List<Message> readMsgs = new ArrayList<Message>();
-
-    String sessionId = manager.getSessionId();
-    List<String> curResourceNames =
-        accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
-    List<PropertyKey> createCurStateKeys = new ArrayList<PropertyKey>();
-    List<CurrentState> metaCurStates = new ArrayList<CurrentState>();
-    Set<String> createCurStateNames = new HashSet<String>();
-
-    changeContext.add(NotificationContext.TASK_EXECUTOR_KEY, this);
-    for (Message message : messages)
-    {
-      // nop messages are simply removed. It is used to trigger onMessage() in
-      // situations such as register a new message handler factory
-      if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString()))
-      {
-        LOG.info("Dropping NO-OP message. mid: " + message.getId() + ", from: "
-            + message.getMsgSrc());
-        accessor.removeProperty(message.getKey(keyBuilder, instanceName));
-        continue;
-      }
-
-      String tgtSessionId = message.getTgtSessionId();
-
-      // if sessionId not match, remove it
-      if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*"))
-      {
-        String warningMessage =
-            "SessionId does NOT match. expected sessionId: " + sessionId
-                + ", tgtSessionId in message: " + tgtSessionId + ", messageId: "
-                + message.getMsgId();
-        LOG.warn(warningMessage);
-        accessor.removeProperty(message.getKey(keyBuilder, instanceName));
-        _statusUpdateUtil.logWarning(message,
-                                     HelixStateMachineEngine.class,
-                                     warningMessage,
-                                     accessor);
-        continue;
-      }
-
-      // don't process message that is of READ or UNPROCESSABLE state
-      if (MessageState.NEW != message.getMsgState())
-      {
-        // It happens because we don't delete message right after
-        // read. Instead we keep it until the current state is updated.
-        // We will read the message again if there is a new message but we
-        // check for the status and ignore if its already read
-        LOG.trace("Message already read. mid: " + message.getMsgId());
-        continue;
-      }
-
-      // create message handlers, if handlers not found, leave its state as NEW
-      try
-      {
-        List<MessageHandler> createHandlers =
-            createMessageHandlers(message, changeContext);
-        if (createHandlers.isEmpty())
-        {
-          continue;
-        }
-        handlers.addAll(createHandlers);
-      }
-      catch (Exception e)
-      {
-        LOG.error("Failed to create message handler for " + message.getMsgId(), e);
-        String error =
-            "Failed to create message handler for " + message.getMsgId()
-                + ", exception: " + e;
-
-        _statusUpdateUtil.logError(message,
-                                   HelixStateMachineEngine.class,
-                                   e,
-                                   error,
-                                   accessor);
-
-        // Mark message state UNPROCESSABLE if we hit an exception in creating
-        // message handler. The message will stay on zookeeper but will not be processed
-        message.setMsgState(MessageState.UNPROCESSABLE);
-        accessor.updateProperty(message.getKey(keyBuilder, instanceName), message);
-        continue;
-      }
-
-      // update msgState to read
-      message.setMsgState(MessageState.READ);
-      message.setReadTimeStamp(new Date().getTime());
-      message.setExecuteSessionId(changeContext.getManager().getSessionId());
-
-      _statusUpdateUtil.logInfo(message,
-                                HelixStateMachineEngine.class,
-                                "New Message",
-                                accessor);
-
-      readMsgs.add(message);
-
-      // batch creation of all current state meta data
-      // do it for non-controller and state transition messages only
-      if (!message.isControlerMsg()
-          && message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString()))
-      {
-        String resourceName = message.getResourceName();
-        if (!curResourceNames.contains(resourceName)
-            && !createCurStateNames.contains(resourceName))
-        {
-          createCurStateNames.add(resourceName);
-          createCurStateKeys.add(keyBuilder.currentState(instanceName,
-                                                         sessionId,
-                                                         resourceName));
-
-          CurrentState metaCurState = new CurrentState(resourceName);
-          metaCurState.setBucketSize(message.getBucketSize());
-          metaCurState.setStateModelDefRef(message.getStateModelDef());
-          metaCurState.setSessionId(sessionId);
-          metaCurState.setGroupMessageMode(message.getGroupMessageMode());
-          String ftyName = message.getStateModelFactoryName();
-          if (ftyName != null)
-          {
-            metaCurState.setStateModelFactoryName(ftyName);
-          }
-          else
-          {
-            metaCurState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
-          }
-
-          metaCurStates.add(metaCurState);
-        }
-      }
-    }
-
-    // batch create curState meta
-    if (createCurStateKeys.size() > 0)
-    {
-      try
-      {
-        accessor.createChildren(createCurStateKeys, metaCurStates);
-      }
-      catch (Exception e)
-      {
-        LOG.error(e);
-      }
-    }
-
-    // update message state to READ in batch and schedule all read messages
-    if (readMsgs.size() > 0)
-    {
-      updateMessageState(readMsgs, accessor, instanceName);
-
-      for (MessageHandler handler : handlers)
-      {
-        scheduleTask(handler._message, handler, changeContext);
-      }
-    }
-  }
-
-  private MessageHandler createMessageHandler(Message message,
-                                              NotificationContext changeContext)
-  {
-    String msgType = message.getMsgType().toString();
-
-    MessageHandlerFactory handlerFactory = _handlerFactoryMap.get(msgType);
-
-    // Fail to find a MessageHandlerFactory for the message
-    // we will keep the message and the message will be handled when
-    // the corresponding MessageHandlerFactory is registered
-    if (handlerFactory == null)
-    {
-      LOG.warn("Fail to find message handler factory for type: " + msgType + " mid:"
-          + message.getMsgId());
-      return null;
-    }
-
-    return handlerFactory.createHandler(message, changeContext);
-  }
-
-  private List<MessageHandler> createMessageHandlers(Message message,
-                                                     NotificationContext changeContext)
-  {
-    List<MessageHandler> handlers = new ArrayList<MessageHandler>();
-    if (!message.getGroupMessageMode())
-    {
-      LOG.info("Creating handler for message " + message.getMsgId() + "/"
-          + message.getPartitionName());
-
-      MessageHandler handler = createMessageHandler(message, changeContext);
-
-      if (handler != null)
-      {
-        handlers.add(handler);
-      }
-    }
-    else
-    {
-      _groupMsgHandler.put(message);
-
-      List<String> partitionNames = message.getPartitionNames();
-      for (String partitionName : partitionNames)
-      {
-        Message subMsg = new Message(message.getRecord());
-        subMsg.setPartitionName(partitionName);
-        subMsg.setAttribute(Attributes.PARENT_MSG_ID, message.getId());
-
-        LOG.info("Creating handler for group message " + subMsg.getMsgId() + "/"
-            + partitionName);
-        MessageHandler handler = createMessageHandler(subMsg, changeContext);
-        if (handler != null)
-        {
-          handlers.add(handler);
-        }
-      }
-    }
-
-    return handlers;
-  }
-
-  public void shutDown()
-  {
-    LOG.info("shutting down TaskExecutor");
-    synchronized (_lock)
-    {
-      for (String msgType : _threadpoolMap.keySet())
-      {
-        List<Runnable> tasksLeft = _threadpoolMap.get(msgType).shutdownNow();
-        LOG.info(tasksLeft.size() + " tasks are still in the threadpool for msgType "
-            + msgType);
-      }
-      for (String msgType : _threadpoolMap.keySet())
-      {
-        try
-        {
-          if (!_threadpoolMap.get(msgType).awaitTermination(200, TimeUnit.MILLISECONDS))
-          {
-            LOG.warn(msgType + " is not fully termimated in 200 MS");
-            System.out.println(msgType + " is not fully termimated in 200 MS");
-          }
-        }
-        catch (InterruptedException e)
-        {
-          LOG.error("Interrupted", e);
-        }
-      }
-    }
-    _monitor.shutDown();
-    LOG.info("shutdown finished");
-  }
-
-  // TODO: remove this
-  public static void main(String[] args) throws Exception
-  {
-    ExecutorService pool = Executors.newFixedThreadPool(DEFAULT_PARALLEL_TASKS);
-    Future<HelixTaskResult> future;
-    future = pool.submit(new Callable<HelixTaskResult>()
-    {
-
-      @Override
-      public HelixTaskResult call() throws Exception
-      {
-        System.out.println("CMTaskExecutor.main(...).new Callable() {...}.call()");
-        return null;
-      }
-
-    });
-    future = pool.submit(new HelixTask(null, null, null, null));
-    Thread.currentThread().join();
-    System.out.println(future.isDone());
-  }
-}