You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/02/05 22:06:14 UTC

[2/3] git commit: HELIX-42: refactor batch message handling

HELIX-42: refactor batch message handling


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/a1bf1244
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/a1bf1244
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/a1bf1244

Branch: refs/heads/master
Commit: a1bf1244f18a0b43bc53aabe702a03516df72fc0
Parents: 1ec3ac6
Author: zzhang <zz...@uci.edu>
Authored: Tue Feb 5 13:06:04 2013 -0800
Committer: zzhang <zz...@uci.edu>
Committed: Tue Feb 5 13:06:04 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/HelixProperty.java  |   10 +-
 .../java/org/apache/helix/NotificationContext.java |    8 +-
 .../stages/CurrentStateComputationStage.java       |    2 +-
 .../stages/ResourceComputationStage.java           |    4 +-
 .../controller/stages/TaskAssignmentStage.java     |    2 +-
 .../apache/helix/manager/zk/ZKHelixManager.java    |    2 +-
 .../messaging/handling/BatchMessageHandler.java    |  228 +++++++++
 .../messaging/handling/CurrentStateUpdate.java     |   40 ++
 .../messaging/handling/HelixBatchMessageTask.java  |  121 +++++
 .../handling/HelixStateTransitionHandler.java      |   73 ++--
 .../apache/helix/messaging/handling/HelixTask.java |  241 ++++------
 .../messaging/handling/HelixTaskExecutor.java      |  364 ++++++++-------
 .../helix/messaging/handling/MessageTask.java      |   36 ++
 .../helix/messaging/handling/MessageTaskInfo.java  |   41 ++
 .../messaging/handling/MessageTimeoutTask.java     |   50 ++
 .../helix/messaging/handling/TaskExecutor.java     |   92 ++++
 .../helix/participant/HelixStateMachineEngine.java |  104 +++--
 .../statemachine/StateModelFactory.java            |    6 +-
 .../src/test/java/org/apache/helix/Mocks.java      |    9 +-
 .../org/apache/helix/TestHelixTaskExecutor.java    |   16 +-
 .../org/apache/helix/TestHelixTaskHandler.java     |    4 +-
 .../apache/helix/integration/TestBatchMessage.java |  298 ++++++++++++
 .../apache/helix/integration/TestGroupMessage.java |  213 ---------
 .../handling/TestConfigThreadpoolSize.java         |    4 +-
 .../messaging/handling/TestHelixTaskExecutor.java  |   41 ++-
 .../handling/TestResourceThreadpoolSize.java       |    2 +-
 26 files changed, 1380 insertions(+), 631 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/HelixProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index 080ee58..31ac50a 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -35,7 +35,7 @@ public class HelixProperty
   public enum HelixPropertyAttribute
   {
     BUCKET_SIZE,
-    GROUP_MESSAGE_MODE
+    BATCH_MESSAGE_MODE
   }
 
   protected final ZNRecord _record;
@@ -178,16 +178,16 @@ public class HelixProperty
     return records;
   }
 
-  public void setGroupMessageMode(boolean enable)
+  public void setBatchMessageMode(boolean enable)
   {
-    _record.setSimpleField(HelixPropertyAttribute.GROUP_MESSAGE_MODE.toString(), ""
+    _record.setSimpleField(HelixPropertyAttribute.BATCH_MESSAGE_MODE.toString(), ""
         + enable);
   }
 
-  public boolean getGroupMessageMode()
+  public boolean getBatchMessageMode()
   {
     String enableStr =
-        _record.getSimpleField(HelixPropertyAttribute.GROUP_MESSAGE_MODE.toString());
+        _record.getSimpleField(HelixPropertyAttribute.BATCH_MESSAGE_MODE.toString());
     if (enableStr == null)
     {
       return false;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/NotificationContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/NotificationContext.java b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
index 0ddc15c..f683e77 100644
--- a/helix-core/src/main/java/org/apache/helix/NotificationContext.java
+++ b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
@@ -24,8 +24,12 @@ import java.util.Map;
 
 public class NotificationContext
 {
-  // keys used for object map
-  public static final String TASK_EXECUTOR_KEY = "TASK_EXECUTOR";
+	// keys used for object map
+	public enum MapKey {
+		TASK_EXECUTOR,
+		CURRENT_STATE_UPDATE,
+		HELIX_TASK_RESULT
+	}
   
   private Map<String, Object> _map;
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index be501dd..6612ee0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -79,7 +79,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage
           continue;
         }
 
-        if (!message.getGroupMessageMode())
+        if (!message.getBatchMessageMode())
         {
           String partitionName = message.getPartitionName();
           Partition partition = resource.getPartition(partitionName);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 663eb2a..00ba8eb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -71,7 +71,7 @@ public class ResourceComputationStage extends AbstractBaseStage
           resource.setStateModelDefRef(idealState.getStateModelDefRef());
           resource.setStateModelFactoryName(idealState.getStateModelFactoryName());
           resource.setBucketSize(idealState.getBucketSize());
-          resource.setGroupMessageMode(idealState.getGroupMessageMode());
+          resource.setGroupMessageMode(idealState.getBatchMessageMode());
         }
       }
     }
@@ -107,7 +107,7 @@ public class ResourceComputationStage extends AbstractBaseStage
             resource.setStateModelDefRef(currentState.getStateModelDefRef());
             resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
             resource.setBucketSize(currentState.getBucketSize());
-            resource.setGroupMessageMode(currentState.getGroupMessageMode());
+            resource.setGroupMessageMode(currentState.getBatchMessageMode());
           }
           
           if (currentState.getStateModelDefRef() == null)

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 822fcb8..0a6c250 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -110,7 +110,7 @@ public class TaskAssignmentStage extends AbstractBaseStage
       if (!groupMessages.containsKey(key))
       {
         Message groupMessage = new Message(message.getRecord());
-        groupMessage.setGroupMessageMode(true);
+        groupMessage.setBatchMessageMode(true);
         outputMessages.add(groupMessage);
         groupMessages.put(key, groupMessage);
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 1ba8985..3d13cd0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -456,7 +456,7 @@ public class ZKHelixManager implements HelixManager
      * shutdown thread pool first to avoid reset() being invoked in the middle of state
      * transition
      */
-    _messagingService.getExecutor().shutDown();
+    _messagingService.getExecutor().shutdown();
     resetHandlers();
 
     _helixAccessor.shutdown();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
new file mode 100644
index 0000000..7ae0c38
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
@@ -0,0 +1,228 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.MapKey;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.log4j.Logger;
+
+public class BatchMessageHandler extends MessageHandler {
+	private static Logger LOG = Logger.getLogger(BatchMessageHandler.class);
+
+	final MessageHandlerFactory _msgHandlerFty;
+	final TaskExecutor _executor;
+	final List<Message> _subMessages;
+	final List<MessageHandler> _subMessageHandlers;
+
+	public BatchMessageHandler(Message msg, NotificationContext context, MessageHandlerFactory fty,
+	        TaskExecutor executor) {
+		super(msg, context);
+		
+		if (fty == null || executor == null) {
+			throw new HelixException("MessageHandlerFactory | TaskExecutor can't be null");
+		}
+		
+		_msgHandlerFty = fty;
+		_executor = executor;
+
+		// create sub-messages
+		_subMessages = new ArrayList<Message>();
+		List<String> partitionKeys = _message.getPartitionNames();
+		for (String partitionKey : partitionKeys) {
+			// assign a new message id, put batch-msg-id to parent-id field
+			Message subMsg = new Message(_message.getRecord(), UUID.randomUUID().toString());
+			subMsg.setPartitionName(partitionKey);
+			subMsg.setAttribute(Attributes.PARENT_MSG_ID, _message.getId());
+			subMsg.setBatchMessageMode(false);
+
+			_subMessages.add(subMsg);
+		}
+
+		// create sub-message handlers
+		_subMessageHandlers = createMsgHandlers(_subMessages, context);
+	}
+	
+	List<MessageHandler> createMsgHandlers(List<Message> msgs, NotificationContext context) {
+		
+		List<MessageHandler> handlers = new ArrayList<MessageHandler>();
+		for (Message msg : msgs) {
+			 MessageHandler handler = _msgHandlerFty.createHandler(msg, context);
+			 handlers.add(handler);
+		}
+		return handlers;
+	}
+
+
+	public void preHandleMessage() {
+		// TODO add batch-message-handler.start() here
+	}
+
+	public void postHandleMessage() {
+		// TODO add batch-message-handler.end() here
+		
+		// update currentState
+		HelixManager manager = _notificationContext.getManager();
+		HelixDataAccessor accessor = manager.getHelixDataAccessor();
+		ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap = (ConcurrentHashMap<String, CurrentStateUpdate>) _notificationContext
+		        .get(MapKey.CURRENT_STATE_UPDATE.toString());
+		
+		if (csUpdateMap != null) {
+    		Map<PropertyKey, CurrentState> csUpdate = mergeCurStateUpdate(csUpdateMap);
+    
+    		// TODO: change to use asyncSet
+    		for (PropertyKey key : csUpdate.keySet()) {
+    			// logger.info("updateCS: " + key);
+    			// System.out.println("\tupdateCS: " + key.getPath() + ", " +
+    			// curStateMap.get(key));
+    			accessor.updateProperty(key, csUpdate.get(key));
+    		}
+		}
+	}
+
+	// will not return until all sub-message executions are done
+	@Override
+	public HelixTaskResult handleMessage() {
+		HelixTaskResult result = null;
+		List<Future<HelixTaskResult>> futures = null;
+		List<MessageTask> batchTasks = new ArrayList<MessageTask>();
+
+		// TODO sync on resource level
+		{
+			try {
+				preHandleMessage();
+
+    			int exeBatchSize = 1; // TODO: getExeBatchSize from msg
+    			List<String> partitionKeys = _message.getPartitionNames();
+    			for (int i = 0; i < partitionKeys.size(); i += exeBatchSize) {
+    				if (i + exeBatchSize <= partitionKeys.size()) {
+    					List<Message> msgs = _subMessages.subList(i, i + exeBatchSize);
+    					List<MessageHandler> handlers = _subMessageHandlers.subList(i, i + exeBatchSize);
+    					HelixBatchMessageTask batchTask = new HelixBatchMessageTask(_message, msgs, handlers, _notificationContext);
+    					batchTasks.add(batchTask);
+    
+    				} else {
+    					List<Message> msgs = _subMessages.subList(i, i + partitionKeys.size());
+    					List<MessageHandler> handlers = _subMessageHandlers.subList(i, i + partitionKeys.size());
+    
+    					HelixBatchMessageTask batchTask = new HelixBatchMessageTask(_message, msgs, handlers, _notificationContext);
+    					batchTasks.add(batchTask);
+    				}
+    			}
+
+				// invokeAll() is blocking call
+    			long timeout = _message.getExecutionTimeout();
+    			if (timeout == -1) {
+    				timeout = Long.MAX_VALUE;
+    			}
+				futures = _executor.invokeAllTasks(batchTasks, timeout, TimeUnit.MILLISECONDS);				
+			} catch (Exception e) {
+				LOG.error("fail to execute batchMsg: " + _message.getId(), e);
+				result = new HelixTaskResult();
+				result.setException(e);
+				
+				// HelixTask will call onError on this batch-msg-handler
+				// return result;
+			}
+
+
+			// combine sub-results to result
+			if (futures != null) {
+				boolean isBatchTaskSucceed = true;
+				
+				for (int i = 0; i < futures.size(); i++) {
+					Future<HelixTaskResult> future = futures.get(i);
+					MessageTask subTask = batchTasks.get(i);
+					try {
+	                    HelixTaskResult subTaskResult = future.get();
+	                    if (!subTaskResult.isSucess()) {
+	                    	isBatchTaskSucceed = false;
+	                    }
+                    } catch (InterruptedException e) {
+                    	isBatchTaskSucceed = false;
+                    	LOG.error("interrupted in executing batch-msg: " + _message.getId() + ", sub-msg: " + subTask.getTaskId(), e);
+                    } catch (ExecutionException e) {
+                    	isBatchTaskSucceed = false;
+                    	LOG.error("fail to execute batch-msg: " + _message.getId() + ", sub-msg: " + subTask.getTaskId(), e);
+                    }
+				}
+				result = new HelixTaskResult();
+				result.setSuccess(isBatchTaskSucceed);
+			}
+			
+			// pass task-result to post-handle-msg
+			_notificationContext.add(MapKey.HELIX_TASK_RESULT.toString(), result);
+			postHandleMessage();
+
+			return result;
+		}
+	}
+
+	@Override
+	public void onError(Exception e, ErrorCode code, ErrorType type) {
+		// if one sub-message execution fails, call onError on all sub-message handlers
+		for (MessageHandler handler : _subMessageHandlers) {
+			handler.onError(e, code, type);
+		}
+	}
+
+	// TODO: optimize this based on the fact that each cs update is for a
+	// distinct partition
+	private Map<PropertyKey, CurrentState> mergeCurStateUpdate(
+	        ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap) {
+		Map<String, CurrentStateUpdate> curStateUpdateMap = new HashMap<String, CurrentStateUpdate>();
+		for (CurrentStateUpdate update : csUpdateMap.values()) {
+			String path = update._key.getPath(); // TODO: this is time
+			                                     // consuming, optimize it
+			if (!curStateUpdateMap.containsKey(path)) {
+				curStateUpdateMap.put(path, update);
+			} else {
+				// long start = System.currentTimeMillis();
+				curStateUpdateMap.get(path).merge(update._delta);
+				// long end = System.currentTimeMillis();
+				// LOG.info("each merge took: " + (end - start));
+			}
+		}
+
+		Map<PropertyKey, CurrentState> ret = new HashMap<PropertyKey, CurrentState>();
+		for (CurrentStateUpdate update : curStateUpdateMap.values()) {
+			ret.put(update._key, update._delta);
+		}
+
+		return ret;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java
new file mode 100644
index 0000000..ecce683
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java
@@ -0,0 +1,40 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+
+public class CurrentStateUpdate {
+	final PropertyKey _key;
+	final CurrentState _delta;
+	
+	CurrentStateUpdate(PropertyKey key, CurrentState delta)
+	{
+		_key = key;
+		_delta = delta;
+	}
+	
+	public void merge(CurrentState anotherDelta)
+	{
+		_delta.getRecord().merge(anotherDelta.getRecord());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
new file mode 100644
index 0000000..f4c82b3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
@@ -0,0 +1,121 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+public class HelixBatchMessageTask implements MessageTask {
+	private static Logger LOG = Logger.getLogger(HelixBatchMessageTask.class);
+
+	final NotificationContext _context;
+	final Message _batchMsg;
+	final List<Message> _subMsgs;
+	final List<MessageHandler> _handlers;
+
+	public HelixBatchMessageTask(Message batchMsg, List<Message> subMsgs, List<MessageHandler> handlers,
+	        NotificationContext context) {
+		_batchMsg = batchMsg;
+		_context = context;
+		_subMsgs = subMsgs;
+		_handlers = handlers;
+	}
+
+	@Override
+	public HelixTaskResult call() throws Exception {
+	    HelixTaskResult taskResult = null;
+	    
+	    long start = System.currentTimeMillis();
+	    LOG.info("taskId:" + getTaskId() + " handling task begin, at: " + start);
+
+	    boolean isSucceed = true;
+	    try
+	    {
+    		for (MessageHandler handler : _handlers) {
+    			if (handler != null) {
+    				HelixTaskResult subTaskResult = handler.handleMessage();
+    				// if any fails, return false
+    				if (!subTaskResult.isSucess()) {
+    					// System.err.println("\t[dbg]error handling message: " + handler._message);
+    					isSucceed = false;
+    				}
+    			}
+    		}
+	    }
+	    catch (Exception e)
+	    {
+	      String errorMessage =
+	          "Exception while executing task: " + getTaskId();
+	      LOG.error(errorMessage, e);
+	      
+	      taskResult = new HelixTaskResult();
+	      taskResult.setException(e);
+	      taskResult.setMessage(e.getMessage());
+	      
+	      return taskResult;
+	    }
+
+	    if (isSucceed) {
+	    	LOG.info("task: " + getTaskId() + " completed sucessfully");
+	    }
+	    
+	    taskResult = new HelixTaskResult();
+	    taskResult.setSuccess(isSucceed);
+		return taskResult;
+	}
+
+	@Override
+	public String getTaskId() {
+		StringBuilder sb = new StringBuilder();
+		sb.append(_batchMsg.getId());
+		sb.append("/");
+		List<String> msgIdList = new ArrayList<String>();
+		if (_subMsgs != null) {
+			for (Message msg : _subMsgs) {
+				msgIdList.add(msg.getId());
+			}
+		}
+		sb.append(msgIdList);
+		return sb.toString();
+	}
+
+	@Override
+	public Message getMessage() {
+		return _batchMsg;
+	}
+
+	@Override
+	public NotificationContext getNotificationContext() {
+		return _context;
+	}
+
+	@Override
+	public void onTimeout() {
+		for (MessageHandler handler : _handlers) {
+			if (handler != null) {
+				handler.onTimeout();
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 4616123..20b2ff1 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -25,11 +25,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecordBucketizer;
 import org.apache.helix.ZNRecordDelta;
@@ -37,6 +39,7 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelParser;
 import org.apache.helix.participant.statemachine.StateTransitionError;
@@ -60,25 +63,24 @@ public class HelixStateTransitionHandler extends MessageHandler
   private final StateModelParser _transitionMethodFinder;
   private final CurrentState     _currentStateDelta;
   volatile boolean               _isTimeout = false;
-  private final HelixTaskExecutor              _executor;
 
   public HelixStateTransitionHandler(StateModel stateModel,
                                      Message message,
                                      NotificationContext context,
-                                     CurrentState currentStateDelta,
-                                     HelixTaskExecutor executor)
+                                     CurrentState currentStateDelta)
   {
     super(message, context);
     _stateModel = stateModel;
     _statusUpdateUtil = new StatusUpdateUtil();
     _transitionMethodFinder = new StateModelParser();
     _currentStateDelta = currentStateDelta;
-    _executor = executor;
   }
 
-  private void prepareMessageExecution(HelixManager manager, Message message) throws HelixException,
-      HelixStateMismatchException
+  void preHandleMessage() throws Exception
   {
+	Message message = _message;
+	HelixManager manager = _notificationContext.getManager();
+
     if (!message.isValid())
     {
       String errorMessage =
@@ -93,7 +95,7 @@ public class HelixStateTransitionHandler extends MessageHandler
       logger.error(errorMessage);
       throw new HelixException(errorMessage);
     }
-    // DataAccessor accessor = manager.getDataAccessor();
+    
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
 
     String partitionName = message.getPartitionName();
@@ -119,12 +121,13 @@ public class HelixStateTransitionHandler extends MessageHandler
     }
   }
 
-  void postExecutionMessage(HelixManager manager,
-                            Message message,
-                            NotificationContext context,
-                            HelixTaskResult taskResult,
-                            Exception exception)
+  void postHandleMessage()
   {
+	Message message = _message;
+	HelixManager manager = _notificationContext.getManager();
+	HelixTaskResult taskResult = (HelixTaskResult) _notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
+	Exception exception = taskResult.getException();
+		
     String partitionKey = message.getPartitionName();
     String resource = message.getResourceName();
     String sessionId = message.getTgtSessionId();
@@ -209,7 +212,7 @@ public class HelixStateTransitionHandler extends MessageHandler
               return;
             }
           }
-          _stateModel.rollbackOnError(message, context, error);
+          _stateModel.rollbackOnError(message, _notificationContext, error);
           _currentStateDelta.setState(partitionKey, "ERROR");
           _stateModel.updateState("ERROR");
         }
@@ -222,32 +225,39 @@ public class HelixStateTransitionHandler extends MessageHandler
                               sessionId,
                               resource,
                               bucketizer.getBucketName(partitionKey));
-      if (!_message.getGroupMessageMode())
+      if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null)
       {
+    	// normal message
         accessor.updateProperty(key, _currentStateDelta);
       }
       else
       {
-        _executor._groupMsgHandler.addCurStateUpdate(_message, key, _currentStateDelta);
+    	// sub-message of a batch message
+        ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap 
+          = (ConcurrentHashMap<String, CurrentStateUpdate>) _notificationContext.get(MapKey.CURRENT_STATE_UPDATE.toString());
+        csUpdateMap.put(partitionKey, new CurrentStateUpdate(key, _currentStateDelta));
       }
     }
     catch (Exception e)
     {
-      logger.error("Error when updating the state ", e);
+      logger.error("Error when updating current-state ", e);
       StateTransitionError error =
           new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
-      _stateModel.rollbackOnError(message, context, error);
+      _stateModel.rollbackOnError(message, _notificationContext, error);
       _statusUpdateUtil.logError(message,
                                  HelixStateTransitionHandler.class,
                                  e,
-                                 "Error when update the state ",
+                                 "Error when update current-state ",
                                  accessor);
     }
   }
 
-  public HelixTaskResult handleMessageInternal(Message message,
-                                               NotificationContext context)
+  @Override
+  public HelixTaskResult handleMessage()
   {
+	NotificationContext context = _notificationContext;
+	Message message = _message;
+		
     synchronized (_stateModel)
     {
       HelixTaskResult taskResult = new HelixTaskResult();
@@ -260,10 +270,11 @@ public class HelixStateTransitionHandler extends MessageHandler
                                 accessor);
       message.setExecuteStartTimeStamp(new Date().getTime());
 
-      Exception exception = null;
+      // Exception exception = null;
       try
       {
-        prepareMessageExecution(manager, message);
+    	preHandleMessage();
+        // prepareMessageExecution(manager, message);
         invoke(accessor, context, taskResult, message);
       }
       catch (HelixStateMismatchException e)
@@ -273,7 +284,7 @@ public class HelixStateTransitionHandler extends MessageHandler
         taskResult.setSuccess(false);
         taskResult.setMessage(e.toString());
         taskResult.setException(e);
-        exception = e;
+        // exception = e;
         // return taskResult;
       }
       catch (Exception e)
@@ -295,9 +306,13 @@ public class HelixStateTransitionHandler extends MessageHandler
         taskResult.setMessage(e.toString());
         taskResult.setException(e);
         taskResult.setInterrupted(e instanceof InterruptedException);
-        exception = e;
+        // exception = e;
       }
-      postExecutionMessage(manager, message, context, taskResult, exception);
+//      postExecutionMessage(manager, message, context, taskResult, exception);
+      
+      // add task result to context for postHandling
+      context.add(MapKey.HELIX_TASK_RESULT.toString(), taskResult);
+      postHandleMessage();
 
       return taskResult;
     }
@@ -346,18 +361,12 @@ public class HelixStateTransitionHandler extends MessageHandler
   }
 
   @Override
-  public HelixTaskResult handleMessage()
-  {
-    return handleMessageInternal(_message, _notificationContext);
-  }
-
-  @Override
   public void onError(Exception e, ErrorCode code, ErrorType type)
   {
     // All internal error has been processed already, so we can skip them
     if (type == ErrorType.INTERNAL)
     {
-      logger.error("Skip internal error " + e.getMessage() + " " + code);
+      logger.error("Skip internal error. errCode: " + code + ", errMsg: " + e.getMessage());
       return;
     }
     HelixManager manager = _notificationContext.getManager();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index fd0bd8d..6b5a66e 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -26,11 +26,13 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.messaging.handling.GroupMessageHandler.GroupMessageInfo;
@@ -38,6 +40,7 @@ import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
 import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.monitoring.StateTransitionContext;
 import org.apache.helix.monitoring.StateTransitionDataPoint;
@@ -45,7 +48,7 @@ import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.log4j.Logger;
 
 
-public class HelixTask implements Callable<HelixTaskResult>
+public class HelixTask implements MessageTask
 {
   private static Logger             logger     = Logger.getLogger(HelixTask.class);
   private final Message             _message;
@@ -56,37 +59,10 @@ public class HelixTask implements Callable<HelixTaskResult>
   HelixTaskExecutor                 _executor;
   volatile boolean                  _isTimeout = false;
 
-  public class TimeoutCancelTask extends TimerTask
-  {
-    HelixTaskExecutor   _executor;
-    Message             _message;
-    NotificationContext _context;
-
-    public TimeoutCancelTask(HelixTaskExecutor executor,
-                             Message message,
-                             NotificationContext context)
-    {
-      _executor = executor;
-      _message = message;
-      _context = context;
-    }
-
-    @Override
-    public void run()
-    {
-      _isTimeout = true;
-      logger.warn("Message time out, canceling. id:" + _message.getMsgId()
-          + " timeout : " + _message.getExecutionTimeout());
-      _handler.onTimeout();
-      _executor.cancelTask(_message, _context);
-    }
-
-  }
-
   public HelixTask(Message message,
                    NotificationContext notificationContext,
                    MessageHandler handler,
-                   HelixTaskExecutor executor) throws Exception
+                   HelixTaskExecutor executor)
   {
     this._notificationContext = notificationContext;
     this._message = message;
@@ -99,30 +75,13 @@ public class HelixTask implements Callable<HelixTaskResult>
   @Override
   public HelixTaskResult call()
   {
-    // Start the timeout TimerTask, if necessary
-    Timer timer = null;
-    if (_message.getExecutionTimeout() > 0)
-    {
-      timer = new Timer(true);
-      timer.schedule(new TimeoutCancelTask(_executor, _message, _notificationContext),
-                     _message.getExecutionTimeout());
-      logger.info("Message starts with timeout " + _message.getExecutionTimeout()
-          + " MsgId:" + _message.getMsgId());
-    }
-    else
-    {
-      logger.info("Message does not have timeout. MsgId:" + _message.getMsgId() + "/"
-          + _message.getPartitionName());
-    }
+    HelixTaskResult taskResult = null;
 
-    HelixTaskResult taskResult = new HelixTaskResult();
-
-    Exception exception = null;
-    ErrorType type = ErrorType.INTERNAL;
-    ErrorCode code = ErrorCode.ERROR;
+    ErrorType type = null;
+    ErrorCode code = null;
 
     long start = System.currentTimeMillis();
-    logger.info("msg:" + _message.getMsgId() + " handling task begin, at: " + start);
+    logger.info("handling task: " + getTaskId() + " begin, at: " + start);
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     _statusUpdateUtil.logInfo(_message,
                               HelixTask.class,
@@ -130,147 +89,133 @@ public class HelixTask implements Callable<HelixTaskResult>
                               accessor);
     _message.setExecuteStartTimeStamp(new Date().getTime());
 
+    // add a concurrent map to hold currentStateUpdates for sub-messages of a batch-message
+    // partitionName -> csUpdate
+    if (_message.getBatchMessageMode() == true) {
+  	  _notificationContext.add(MapKey.CURRENT_STATE_UPDATE.toString(), 
+  			  new ConcurrentHashMap<String, CurrentStateUpdate>());
+    }
+
     // Handle the message
     try
     {
       taskResult = _handler.handleMessage();
-      exception = taskResult.getException();
     }
     catch (InterruptedException e)
     {
+      taskResult = new HelixTaskResult();
+      taskResult.setException(e);
+      taskResult.setInterrupted(true);
+
       _statusUpdateUtil.logError(_message,
                                  HelixTask.class,
                                  e,
                                  "State transition interrupted, timeout:" + _isTimeout,
                                  accessor);
       logger.info("Message " + _message.getMsgId() + " is interrupted");
-      taskResult.setInterrupted(true);
-      taskResult.setException(e);
-      exception = e;
     }
     catch (Exception e)
     {
+      taskResult = new HelixTaskResult();
+      taskResult.setException(e);
+      taskResult.setMessage(e.getMessage());
+        
       String errorMessage =
           "Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
               + " type: " + _message.getMsgType();
       logger.error(errorMessage, e);
       _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
-      taskResult.setSuccess(false);
-      taskResult.setException(e);
-      taskResult.setMessage(e.getMessage());
-      exception = e;
     }
 
-    // Cancel the timer since the handling is done
-    // it is fine if the TimerTask for canceling is called already
-    if (timer != null)
-    {
-      timer.cancel();
-    }
-
-    if (taskResult.isSucess())
+    // cancel timeout task
+    _executor.cancelTimeoutTask(this);
+    
+    Exception exception = null;
+    try
     {
-      _statusUpdateUtil.logInfo(_message,
+      if (taskResult.isSucess())
+      {
+        _statusUpdateUtil.logInfo(_message,
                                 _handler.getClass(),
                                 "Message handling task completed successfully",
                                 accessor);
-      logger.info("Message " + _message.getMsgId() + " completed.");
-    }
-    else if (taskResult.isInterrupted())
-    {
-      logger.info("Message " + _message.getMsgId() + " is interrupted");
-      code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
-      if (_isTimeout)
-      {
-        int retryCount = _message.getRetryCount();
-        logger.info("Message timeout, retry count: " + retryCount + " MSGID:"
-            + _message.getMsgId());
-        _statusUpdateUtil.logInfo(_message,
+        logger.info("Message " + _message.getMsgId() + " completed.");
+      }
+      else {
+    	  type = ErrorType.INTERNAL;
+    	  
+    	  if (taskResult.isInterrupted())
+          {
+    		  logger.info("Message " + _message.getMsgId() + " is interrupted");
+    		  code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
+    		  if (_isTimeout)
+    		  {
+    			  int retryCount = _message.getRetryCount();
+    			  logger.info("Message timeout, retry count: " + retryCount + " msgId:"
+    					  + _message.getMsgId());
+    			  _statusUpdateUtil.logInfo(_message,
                                   _handler.getClass(),
                                   "Message handling task timeout, retryCount:"
                                       + retryCount,
                                   accessor);
-        // Notify the handler that timeout happens, and the number of retries left
-        // In case timeout happens (time out and also interrupted)
-        // we should retry the execution of the message by re-schedule it in
-        if (retryCount > 0)
-        {
-          _message.setRetryCount(retryCount - 1);
-          _executor.scheduleTask(_message, _handler, _notificationContext);
-          return taskResult;
-        }
-      }
-    }
-    else
-    // logging for errors
-    {
-      String errorMsg =
-          "Message execution failed. msgId: " + _message.getMsgId()
-              + taskResult.getMessage();
-      if (exception != null)
-      {
-        errorMsg += exception;
-      }
-      logger.error(errorMsg, exception);
-      _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
-    }
-
-    // Post-processing for the finished task
-    try
-    {
-      if (!_message.getGroupMessageMode())
-      {
-        removeMessageFromZk(accessor, _message);
-        reportMessageStat(_manager, _message, taskResult);
-        sendReply(accessor, _message, taskResult);
-      }
-      else
-      {
-        GroupMessageInfo info = _executor._groupMsgHandler.onCompleteSubMessage(_message); 
-        if (info != null)
-        {
-          // TODO: changed to async update
-          // group update current state
-          Map<PropertyKey, CurrentState> curStateMap = info.merge();
-          for (PropertyKey key : curStateMap.keySet())
-          {
-            accessor.updateProperty(key, curStateMap.get(key));
+    			  // Notify the handler that timeout happens, and the number of retries left
+    			  // In case timeout happens (time out and also interrupted)
+    			  // we should retry the execution of the message by re-schedule it in
+    			  if (retryCount > 0)
+    			  {
+    				  _message.setRetryCount(retryCount - 1);
+                      HelixTask task = new HelixTask(_message, _notificationContext, _handler, _executor);
+                      _executor.scheduleTask(task);
+                      return taskResult;
+    			  }
+    		  }
           }
-
-          // remove group message
+    	  else  // logging for errors
+    	  {
+    		  code = ErrorCode.ERROR;
+    		  String errorMsg =
+    			  "Message execution failed. msgId: " + getTaskId()
+    			  + ", errorMsg: " + taskResult.getMessage();
+    		  logger.error(errorMsg);
+    		  _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
+    	  }
+      }
+      
+      if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
+    	  // System.err.println("\t[dbg]remove msg: " + getTaskId());
           removeMessageFromZk(accessor, _message);
           reportMessageStat(_manager, _message, taskResult);
           sendReply(accessor, _message, taskResult);
-        }
+          _executor.finishTask(this);
       }
-      _executor.reportCompletion(_message);
     }
-
-    // TODO: capture errors and log here
     catch (Exception e)
     {
+      exception = e;
+      type = ErrorType.FRAMEWORK;
+      code = ErrorCode.ERROR;
+        
       String errorMessage =
           "Exception after executing a message, msgId: " + _message.getMsgId() + e;
       logger.error(errorMessage, e);
       _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
-      exception = e;
-      type = ErrorType.FRAMEWORK;
-      code = ErrorCode.ERROR;
     }
-    //
     finally
     {
       long end = System.currentTimeMillis();
-      logger.info("msg:" + _message.getMsgId() + " handling task completed, results:"
+      logger.info("msg: " + _message.getMsgId() + " handling task completed, results:"
           + taskResult.isSucess() + ", at: " + end + ", took:" + (end - start));
 
       // Notify the handler about any error happened in the handling procedure, so that
       // the handler have chance to finally cleanup
-      if (exception != null)
+      if (type == ErrorType.INTERNAL)
       {
-        _handler.onError(exception, code, type);
+        _handler.onError(taskResult.getException(), code, type);
+      } else if (type == ErrorType.FRAMEWORK) {
+    	  _handler.onError(exception, code, type);
       }
     }
+    
     return taskResult;
   }
 
@@ -370,4 +315,26 @@ public class HelixTask implements Callable<HelixTaskResult>
     }
   }
 
+  @Override
+  public String getTaskId()
+  {
+	  return _message.getId();
+  }
+  
+  @Override
+  public Message getMessage() {
+		return _message;
+  }
+
+  @Override
+  public 	NotificationContext getNotificationContext()
+  {
+	return _notificationContext;
+  }
+
+  @Override
+  public void onTimeout() {
+	_isTimeout = true;
+	_handler.onTimeout();
+  }
 };

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index cf7ebf8..c674d69 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -28,6 +28,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -46,6 +48,7 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.MessageListener;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.NotificationContext.Type;
@@ -61,14 +64,14 @@ import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.log4j.Logger;
 
 
-public class HelixTaskExecutor implements MessageListener
+public class HelixTaskExecutor implements MessageListener, TaskExecutor
 {
   // TODO: we need to further design how to throttle this.
   // From storage point of view, only bootstrap case is expensive
   // and we need to throttle, which is mostly IO / network bounded.
   public static final int                                DEFAULT_PARALLEL_TASKS     = 40;
   // TODO: create per-task type threadpool with customizable pool size
-  protected final Map<String, Future<HelixTaskResult>>   _taskMap;
+  protected final Map<String, MessageTaskInfo>   _taskMap;
   private final Object                                   _lock;
   private final StatusUpdateUtil                         _statusUpdateUtil;
   private final ParticipantMonitor                       _monitor;
@@ -78,8 +81,7 @@ public class HelixTaskExecutor implements MessageListener
   final ConcurrentHashMap<String, MessageHandlerFactory> _handlerFactoryMap         =
                                                                                         new ConcurrentHashMap<String, MessageHandlerFactory>();
 
-  final ConcurrentHashMap<String, ExecutorService>       _threadpoolMap             =
-                                                                                        new ConcurrentHashMap<String, ExecutorService>();
+  final ConcurrentHashMap<String, ExecutorService>       _executorMap;
 
   private static Logger                                  LOG                        =
                                                                                         Logger.getLogger(HelixTaskExecutor.class);
@@ -87,24 +89,31 @@ public class HelixTaskExecutor implements MessageListener
   Map<String, Integer>                                   _resourceThreadpoolSizeMap =
                                                                                         new ConcurrentHashMap<String, Integer>();
 
-  final GroupMessageHandler                              _groupMsgHandler;
+  // timer for schedule timeout tasks
+  final Timer _timer;
+
 
   public HelixTaskExecutor()
   {
-    _taskMap = new ConcurrentHashMap<String, Future<HelixTaskResult>>();
-    _groupMsgHandler = new GroupMessageHandler();
+    _taskMap = new ConcurrentHashMap<String, MessageTaskInfo>();
+    _executorMap = new ConcurrentHashMap<String, ExecutorService>();
 
     _lock = new Object();
     _statusUpdateUtil = new StatusUpdateUtil();
     _monitor = new ParticipantMonitor();
+    
+    _timer = new Timer(true);	// created as a daemon timer thread to handle task timeout
+
     startMonitorThread();
   }
 
+  @Override
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
   {
     registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS);
   }
 
+  @Override
   public void registerMessageHandlerFactory(String type,
                                             MessageHandlerFactory factory,
                                             int threadpoolSize)
@@ -118,13 +127,16 @@ public class HelixTaskExecutor implements MessageListener
 
       }
       _handlerFactoryMap.put(type, factory);
-      _threadpoolMap.put(type, Executors.newFixedThreadPool(threadpoolSize));
-      LOG.info("Adding msg factory for type " + type + " threadpool size "
+      ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize);
+      _executorMap.put(type, executorSvc);
+      
+      LOG.info("Added msg-factory for type: " + type + ", threadpool size "
           + threadpoolSize);
     }
     else
     {
-      LOG.error("Ignoring duplicate msg handler factory for type " + type);
+      LOG.warn("Fail to register msg-handler-factory for type: " + type 
+    		  + ", pool-size: " + threadpoolSize + ", factory: " + factory);
     }
   }
 
@@ -167,9 +179,9 @@ public class HelixTaskExecutor implements MessageListener
       if (threadpoolSize > 0)
       {
         String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName;
-        _threadpoolMap.put(key, Executors.newFixedThreadPool(threadpoolSize));
-        LOG.info("Adding per resource threadpool for resource " + resourceName
-            + " with size " + threadpoolSize);
+        _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize));
+        LOG.info("Added per resource threadpool for resource: " + resourceName
+            + " with size: " + threadpoolSize);
       }
       _resourceThreadpoolSizeMap.put(resourceName, threadpoolSize);
     }
@@ -182,38 +194,81 @@ public class HelixTaskExecutor implements MessageListener
    **/
   ExecutorService findExecutorServiceForMsg(Message message)
   {
-    ExecutorService executorService = _threadpoolMap.get(message.getMsgType());
+    ExecutorService executorService = _executorMap.get(message.getMsgType());
     if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
     {
       String resourceName = message.getResourceName();
       if (resourceName != null)
       {
         String key = message.getMsgType() + "." + resourceName;
-        if (_threadpoolMap.containsKey(key))
+        if (_executorMap.containsKey(key))
         {
-          LOG.info("Find per-resource thread pool with key " + key);
-          executorService = _threadpoolMap.get(key);
+          LOG.info("Find per-resource thread pool with key: " + key);
+          executorService = _executorMap.get(key);
         }
       }
     }
     return executorService;
   }
 
-  public void scheduleTask(Message message,
-                           MessageHandler handler,
-                           NotificationContext notificationContext)
+  // ExecutorService impl's in JDK are thread-safe
+  @Override
+  public List<Future<HelixTaskResult>> invokeAllTasks(List<MessageTask> tasks, long timeout, TimeUnit unit) throws InterruptedException
   {
-    assert (handler != null);
-    synchronized (_lock)
-    {
-      try
-      {
-        String taskId = message.getMsgId() + "/" + message.getPartitionName();
+	  if (tasks == null || tasks.size() == 0) {
+		  return null;
+	  }
+	  
+	  // check all tasks use the same executor-service
+	  ExecutorService exeSvc = findExecutorServiceForMsg(tasks.get(0).getMessage());
+	  for (int i = 1; i < tasks.size(); i++) {
+		  MessageTask task = tasks.get(i);
+		  ExecutorService curExeSvc = findExecutorServiceForMsg(task.getMessage());
+		  if (curExeSvc != exeSvc) {
+			  LOG.error("Fail to invoke all tasks because they are not using the same executor-service");
+			  return null;
+		  }
+	  }
+	  
+	  // TODO: check if any of the task has already been scheduled
+	  	  
+	  // this is a blocking call
+      List<Future<HelixTaskResult>> futures = exeSvc.invokeAll(tasks, timeout, unit);
+      
+      return futures;
+  }
+
+  @Override
+  public boolean cancelTimeoutTask(MessageTask task)
+  {
+	  synchronized(_lock) {
+		  String taskId = task.getTaskId();
+          if (_taskMap.containsKey(taskId)) {
+        	  MessageTaskInfo info = _taskMap.get(taskId);
+        	  if (info._timerTask != null) {
+        		  info._timerTask.cancel();
+        	  }
+        	  return true;
+          }
+          return false;
+	  }
+  }
+  
+  @Override 
+  public boolean scheduleTask(MessageTask task)
+  {
+    String taskId = task.getTaskId();
+    Message message = task.getMessage();
+    NotificationContext notificationContext = task.getNotificationContext();
 
+    
+    try 
+    {
         if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
         {
           checkResourceConfig(message.getResourceName(), notificationContext.getManager());
         }
+        
         LOG.info("Scheduling message: " + taskId);
         // System.out.println("sched msg: " + message.getPartitionName() + "-"
         // + message.getTgtName() + "-" + message.getFromState() + "-"
@@ -223,28 +278,48 @@ public class HelixTaskExecutor implements MessageListener
                                   HelixTaskExecutor.class,
                                   "Message handling task scheduled",
                                   notificationContext.getManager().getHelixDataAccessor());
-
-        HelixTask task = new HelixTask(message, notificationContext, handler, this);
-        if (!_taskMap.containsKey(taskId))
+        
+        // this sync guarantees that ExecutorService.submit() task and put taskInfo into map are sync'ed
+        synchronized (_lock)
         {
-          LOG.info("Message:" + taskId + " handling task scheduled");
-          Future<HelixTaskResult> future =
-              findExecutorServiceForMsg(message).submit(task);
-          _taskMap.put(taskId, future);
-        }
-        else
-        {
-          _statusUpdateUtil.logWarning(message,
+          if (!_taskMap.containsKey(taskId))
+          {
+          	ExecutorService exeSvc = findExecutorServiceForMsg(message);
+        	Future<HelixTaskResult> future = exeSvc.submit(task);
+        	
+            TimerTask timerTask = null;
+            if (message.getExecutionTimeout() > 0)
+            {
+              timerTask = new MessageTimeoutTask(this, task);
+              _timer.schedule(timerTask, message.getExecutionTimeout());
+              LOG.info("Message starts with timeout " + message.getExecutionTimeout()
+                  + " MsgId: " + task.getTaskId());
+            }
+            else
+            {
+              LOG.debug("Message does not have timeout. MsgId: " + task.getTaskId());
+            }
+
+        	_taskMap.put(taskId, new MessageTaskInfo(task, future, timerTask));
+
+            LOG.info("Message: " + taskId + " handling task scheduled");
+
+            return true;
+          }
+          else
+          {
+            _statusUpdateUtil.logWarning(message,
                                        HelixTaskExecutor.class,
                                        "Message handling task already sheduled for "
                                            + taskId,
                                        notificationContext.getManager()
                                                           .getHelixDataAccessor());
+          }
         }
       }
       catch (Exception e)
       {
-        LOG.error("Error while executing task." + message, e);
+        LOG.error("Error while executing task. " + message, e);
 
         _statusUpdateUtil.logError(message,
                                    HelixTaskExecutor.class,
@@ -253,67 +328,87 @@ public class HelixTaskExecutor implements MessageListener
                                    notificationContext.getManager()
                                                       .getHelixDataAccessor());
       }
-    }
+      return false;
   }
 
-  public void cancelTask(Message message, NotificationContext notificationContext)
+  @Override
+  public boolean cancelTask(MessageTask task)
   {
-    synchronized (_lock)
-    {
-      String taskId = message.getMsgId() + "/" + message.getPartitionName();
+      Message message = task.getMessage();
+      NotificationContext notificationContext = task.getNotificationContext();
+      String taskId = task.getTaskId();
 
-      if (_taskMap.containsKey(taskId))
+      synchronized(_lock) 
       {
-        _statusUpdateUtil.logInfo(message,
+        if (_taskMap.containsKey(taskId))
+        {
+      	  MessageTaskInfo taskInfo = _taskMap.get(taskId);
+    	  // cancel timeout task
+          if (taskInfo._timerTask != null) {
+        	  taskInfo._timerTask.cancel();
+          }
+
+    	  // cancel task
+          Future<HelixTaskResult> future = taskInfo.getFuture();
+
+          _statusUpdateUtil.logInfo(message,
                                   HelixTaskExecutor.class,
-                                  "Trying to cancel the future for " + taskId,
+                                  "Canceling task: " + taskId,
                                   notificationContext.getManager().getHelixDataAccessor());
-        Future<HelixTaskResult> future = _taskMap.get(taskId);
 
-        // If the thread is still running it will be interrupted if cancel(true)
-        // is called. So state transition callbacks should implement logic to
-        // return
-        // if it is interrupted.
-        if (future.cancel(true))
-        {
-          _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled "
+          // If the thread is still running it will be interrupted if cancel(true)
+          // is called. So state transition callbacks should implement logic to
+          // return
+          // if it is interrupted.
+          if (future.cancel(true))
+          {
+            _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled task: "
               + taskId, notificationContext.getManager().getHelixDataAccessor());
-          _taskMap.remove(taskId);
-        }
-        else
-        {
-          _statusUpdateUtil.logInfo(message,
+            _taskMap.remove(taskId);
+            return true;
+          }
+          else
+          {
+            _statusUpdateUtil.logInfo(message,
                                     HelixTaskExecutor.class,
-                                    "false when trying to cancel the message " + taskId,
+                                    "fail to cancel task: " + taskId,
                                     notificationContext.getManager()
                                                        .getHelixDataAccessor());
+          }
         }
-      }
-      else
-      {
-        _statusUpdateUtil.logWarning(message,
+        else
+        {
+          _statusUpdateUtil.logWarning(message,
                                      HelixTaskExecutor.class,
-                                     "Future not found when trying to cancel " + taskId,
+                                     "fail to cancel task: " + taskId + ", future not found",
                                      notificationContext.getManager()
                                                         .getHelixDataAccessor());
+        }
       }
-    }
+      return false;
   }
 
-  protected void reportCompletion(Message message) // String msgId)
+  @Override
+  public void finishTask(MessageTask task)
   {
+	Message message = task.getMessage();
+    String taskId = task.getTaskId();
+    LOG.info("message finished: " + taskId + ", took "
+            + (new Date().getTime() - message.getExecuteStartTimeStamp()));
+
     synchronized (_lock)
     {
-      String taskId = message.getMsgId() + "/" + message.getPartitionName();
-      LOG.info("message finished: " + taskId + ", took "
-          + (new Date().getTime() - message.getExecuteStartTimeStamp()));
       if (_taskMap.containsKey(taskId))
       {
-        _taskMap.remove(taskId);
+          MessageTaskInfo info = _taskMap.remove(taskId);
+          if (info._timerTask != null) {
+        	  // ok to cancel multiple times
+        	  info._timerTask.cancel();
+          }
       }
       else
       {
-        LOG.warn("message " + taskId + "not found in task map");
+        LOG.warn("message " + taskId + " not found in task map");
       }
     }
   }
@@ -346,22 +441,18 @@ public class HelixTaskExecutor implements MessageListener
       {
         factory.reset();
       }
-      // Cancel all scheduled future
-      // synchronized (_lock)
+      // Cancel all scheduled tasks
+      synchronized (_lock)
       {
-        for (Future<HelixTaskResult> f : _taskMap.values())
-        {
-          f.cancel(true);
-        }
+          for (MessageTaskInfo info : _taskMap.values())
+          {
+            cancelTask(info._task);
+          }
         _taskMap.clear();
       }
       return;
     }
 
-    HelixManager manager = changeContext.getManager();
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = accessor.keyBuilder();
-
     if (messages == null || messages.size() == 0)
     {
       LOG.info("No Messages to process");
@@ -371,6 +462,10 @@ public class HelixTaskExecutor implements MessageListener
     // sort message by creation timestamp, so message created earlier is processed first
     Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
 
+    HelixManager manager = changeContext.getManager();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
     // message handlers created
     List<MessageHandler> handlers = new ArrayList<MessageHandler>();
 
@@ -384,7 +479,6 @@ public class HelixTaskExecutor implements MessageListener
     List<CurrentState> metaCurStates = new ArrayList<CurrentState>();
     Set<String> createCurStateNames = new HashSet<String>();
 
-    changeContext.add(NotificationContext.TASK_EXECUTOR_KEY, this);
     for (Message message : messages)
     {
       // nop messages are simply removed. It is used to trigger onMessage() in
@@ -399,7 +493,7 @@ public class HelixTaskExecutor implements MessageListener
 
       String tgtSessionId = message.getTgtSessionId();
 
-      // if sessionId not match, remove it
+      // sessionId mismatch normally means message comes from expired session, just remove it
       if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*"))
       {
         String warningMessage =
@@ -422,20 +516,21 @@ public class HelixTaskExecutor implements MessageListener
         // read. Instead we keep it until the current state is updated.
         // We will read the message again if there is a new message but we
         // check for the status and ignore if its already read
-        LOG.trace("Message already read. mid: " + message.getMsgId());
+    	if (LOG.isTraceEnabled()) {
+    		LOG.trace("Message already read. msgId: " + message.getMsgId());
+    	}
         continue;
       }
 
       // create message handlers, if handlers not found, leave its state as NEW
       try
       {
-        List<MessageHandler> createHandlers =
-            createMessageHandlers(message, changeContext);
-        if (createHandlers.isEmpty())
+        MessageHandler createHandler = createMessageHandler(message, changeContext);
+        if (createHandler == null)
         {
           continue;
         }
-        handlers.addAll(createHandlers);
+        handlers.add(createHandler);
       }
       catch (Exception e)
       {
@@ -450,10 +545,10 @@ public class HelixTaskExecutor implements MessageListener
                                    error,
                                    accessor);
 
-        // Mark message state UNPROCESSABLE if we hit an exception in creating
-        // message handler. The message will stay on zookeeper but will not be processed
         message.setMsgState(MessageState.UNPROCESSABLE);
-        accessor.updateProperty(message.getKey(keyBuilder, instanceName), message);
+        accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+        LOG.error("Message cannot be proessed: " + message.getRecord(), e);
+
         continue;
       }
 
@@ -487,7 +582,7 @@ public class HelixTaskExecutor implements MessageListener
           metaCurState.setBucketSize(message.getBucketSize());
           metaCurState.setStateModelDefRef(message.getStateModelDef());
           metaCurState.setSessionId(sessionId);
-          metaCurState.setGroupMessageMode(message.getGroupMessageMode());
+          metaCurState.setBatchMessageMode(message.getBatchMessageMode());
           String ftyName = message.getStateModelFactoryName();
           if (ftyName != null)
           {
@@ -512,7 +607,7 @@ public class HelixTaskExecutor implements MessageListener
       }
       catch (Exception e)
       {
-        LOG.error(e);
+        LOG.error("fail to create cur-state znodes for messages: " + readMsgs, e);
       }
     }
 
@@ -523,7 +618,8 @@ public class HelixTaskExecutor implements MessageListener
 
       for (MessageHandler handler : handlers)
       {
-        scheduleTask(handler._message, handler, changeContext);
+          HelixTask task = new HelixTask(handler._message, changeContext, handler, this);
+          scheduleTask(task);
       }
     }
   }
@@ -540,70 +636,35 @@ public class HelixTaskExecutor implements MessageListener
     // the corresponding MessageHandlerFactory is registered
     if (handlerFactory == null)
     {
-      LOG.warn("Fail to find message handler factory for type: " + msgType + " mid:"
+      LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: "
           + message.getMsgId());
       return null;
     }
 
+    // pass the executor to msg-handler since batch-msg-handler needs task-executor to schedule sub-msgs
+    changeContext.add(MapKey.TASK_EXECUTOR.toString(), this);
     return handlerFactory.createHandler(message, changeContext);
   }
 
-  private List<MessageHandler> createMessageHandlers(Message message,
-                                                     NotificationContext changeContext)
-  {
-    List<MessageHandler> handlers = new ArrayList<MessageHandler>();
-    if (!message.getGroupMessageMode())
-    {
-      LOG.info("Creating handler for message " + message.getMsgId() + "/"
-          + message.getPartitionName());
-
-      MessageHandler handler = createMessageHandler(message, changeContext);
-
-      if (handler != null)
-      {
-        handlers.add(handler);
-      }
-    }
-    else
-    {
-      _groupMsgHandler.put(message);
-
-      List<String> partitionNames = message.getPartitionNames();
-      for (String partitionName : partitionNames)
-      {
-        Message subMsg = new Message(message.getRecord());
-        subMsg.setPartitionName(partitionName);
-        subMsg.setAttribute(Attributes.PARENT_MSG_ID, message.getId());
-
-        LOG.info("Creating handler for group message " + subMsg.getMsgId() + "/"
-            + partitionName);
-        MessageHandler handler = createMessageHandler(subMsg, changeContext);
-        if (handler != null)
-        {
-          handlers.add(handler);
-        }
-      }
-    }
-
-    return handlers;
-  }
-
-  public void shutDown()
+  @Override
+  public void shutdown()
   {
     LOG.info("shutting down TaskExecutor");
+    _timer.cancel();
+    
     synchronized (_lock)
     {
-      for (String msgType : _threadpoolMap.keySet())
+      for (String msgType : _executorMap.keySet())
       {
-        List<Runnable> tasksLeft = _threadpoolMap.get(msgType).shutdownNow();
+        List<Runnable> tasksLeft = _executorMap.get(msgType).shutdownNow();
         LOG.info(tasksLeft.size() + " tasks are still in the threadpool for msgType "
             + msgType);
       }
-      for (String msgType : _threadpoolMap.keySet())
+      for (String msgType : _executorMap.keySet())
       {
         try
         {
-          if (!_threadpoolMap.get(msgType).awaitTermination(200, TimeUnit.MILLISECONDS))
+          if (!_executorMap.get(msgType).awaitTermination(200, TimeUnit.MILLISECONDS))
           {
             LOG.warn(msgType + " is not fully termimated in 200 MS");
             System.out.println(msgType + " is not fully termimated in 200 MS");
@@ -618,25 +679,4 @@ public class HelixTaskExecutor implements MessageListener
     _monitor.shutDown();
     LOG.info("shutdown finished");
   }
-
-  // TODO: remove this
-  public static void main(String[] args) throws Exception
-  {
-    ExecutorService pool = Executors.newFixedThreadPool(DEFAULT_PARALLEL_TASKS);
-    Future<HelixTaskResult> future;
-    future = pool.submit(new Callable<HelixTaskResult>()
-    {
-
-      @Override
-      public HelixTaskResult call() throws Exception
-      {
-        System.out.println("CMTaskExecutor.main(...).new Callable() {...}.call()");
-        return null;
-      }
-
-    });
-    future = pool.submit(new HelixTask(null, null, null, null));
-    Thread.currentThread().join();
-    System.out.println(future.isDone());
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java
new file mode 100644
index 0000000..083d903
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java
@@ -0,0 +1,36 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Callable;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+
+public interface MessageTask extends Callable<HelixTaskResult> {
+	String getTaskId();
+	
+	Message getMessage();
+	
+	NotificationContext getNotificationContext();
+	
+	void onTimeout();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java
new file mode 100644
index 0000000..652ca21
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java
@@ -0,0 +1,41 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.TimerTask;
+import java.util.concurrent.Future;
+
+public class MessageTaskInfo {
+	final MessageTask _task;
+	final Future<HelixTaskResult> _future;
+	final TimerTask _timerTask;
+	
+	public MessageTaskInfo(MessageTask task, Future<HelixTaskResult> future, TimerTask timerTask)
+	{
+		_task = task;
+		_future = future;
+		_timerTask = timerTask;
+	}
+
+	public Future<HelixTaskResult> getFuture() {
+		return _future;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java
new file mode 100644
index 0000000..c89a7be
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java
@@ -0,0 +1,50 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.TimerTask;
+
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+public class MessageTimeoutTask extends TimerTask {
+	private static Logger LOG = Logger.getLogger(MessageTimeoutTask.class);
+
+	final HelixTaskExecutor _executor;
+	final MessageTask _task;
+
+	public MessageTimeoutTask(HelixTaskExecutor executor, MessageTask task)
+	{
+		_executor = executor;
+		_task = task;
+	}
+
+	@Override
+	public void run() {
+		Message message = _task.getMessage();
+		// NotificationContext context = _task.getNotificationContext();
+		// System.out.println("msg: " + message.getMsgId() + " timeouot.");
+		LOG.warn("Message time out, canceling. id:" + message.getMsgId() + " timeout : "
+		        + message.getExecutionTimeout());
+		_task.onTimeout();
+		_executor.cancelTask(_task);
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
new file mode 100644
index 0000000..b038e32
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
@@ -0,0 +1,92 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public interface TaskExecutor {
+	public static final int DEFAULT_PARALLEL_TASKS = 40;
+
+	/**
+	 * register message handler factory this executor can handle
+	 * 
+	 * @param type
+	 * @param factory
+	 */
+	public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory);
+
+	/**
+	 * register message handler factory this executor can handle with specified
+	 * thread-pool size
+	 * 
+	 * @param type
+	 * @param factory
+	 * @param threadpoolSize
+	 */
+	public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
+	        int threadPoolSize);
+
+	/**
+	 * schedule a message execution
+	 * 
+	 * @param message
+	 * @param handler
+	 * @param context
+	 */
+	public boolean scheduleTask(MessageTask task);
+
+	/**
+	 * blocking on scheduling all tasks
+	 * 
+	 * @param tasks
+	 */
+	public List<Future<HelixTaskResult>> invokeAllTasks(List<MessageTask> tasks, long timeout, TimeUnit unit)
+	        throws InterruptedException;
+
+	/**
+	 * cancel a message execution
+	 * 
+	 * @param message
+	 * @param context
+	 */
+	public boolean cancelTask(MessageTask task);
+
+	/**
+	 * cancel the timeout for the given task
+	 * 
+	 * @param task
+	 * @return
+	 */
+	public boolean cancelTimeoutTask(MessageTask task);
+	
+	/**
+	 * finish a message execution
+	 * 
+	 * @param message
+	 */
+	public void finishTask(MessageTask task);
+
+	/**
+	 * shutdown executor
+	 */
+	public void shutdown();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 90cb95a..8de9a53 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -29,10 +29,13 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.messaging.handling.BatchMessageHandler;
 import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
 import org.apache.helix.messaging.handling.HelixTaskExecutor;
 import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.TaskExecutor;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
@@ -49,12 +52,20 @@ public class HelixStateMachineEngine implements StateMachineEngine
 
   // StateModelName->FactoryName->StateModelFactory
   private final Map<String, Map<String, StateModelFactory<? extends StateModel>>> _stateModelFactoryMap;
-  StateModelParser _stateModelParser;
-
+  private final StateModelParser _stateModelParser;
   private final HelixManager _manager;
-
   private final ConcurrentHashMap<String, StateModelDefinition> _stateModelDefs;
 
+  public HelixStateMachineEngine(HelixManager manager)
+  {
+    _stateModelParser = new StateModelParser();
+    _manager = manager;
+
+    _stateModelFactoryMap =
+        new ConcurrentHashMap<String, Map<String, StateModelFactory<? extends StateModel>>>();
+    _stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>();
+  }
+
   public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName)
   {
     return getStateModelFactory(stateModelName,
@@ -71,16 +82,6 @@ public class HelixStateMachineEngine implements StateMachineEngine
     return _stateModelFactoryMap.get(stateModelName).get(factoryName);
   }
 
-  public HelixStateMachineEngine(HelixManager manager)
-  {
-    _stateModelParser = new StateModelParser();
-    _manager = manager;
-
-    _stateModelFactoryMap =
-        new ConcurrentHashMap<String, Map<String, StateModelFactory<? extends StateModel>>>();
-    _stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>();
-  }
-
   @Override
   public boolean registerStateModelFactory(String stateModelDef,
                                            StateModelFactory<? extends StateModel> factory)
@@ -191,8 +192,8 @@ public class HelixStateMachineEngine implements StateMachineEngine
 
     if (!type.equals(MessageType.STATE_TRANSITION.toString()))
     {
-      throw new HelixException("Unexpected msg type for message " + message.getMsgId()
-          + " type:" + message.getMsgType());
+      throw new HelixException("Expect state-transition message type, but was " 
+    		  + message.getMsgType() + ", msgId: " + message.getMsgId());
     }
 
     String partitionKey = message.getPartitionName();
@@ -203,7 +204,7 @@ public class HelixStateMachineEngine implements StateMachineEngine
 
     if (stateModelName == null)
     {
-      logger.error("message does not contain stateModelDef");
+      logger.error("Fail to create msg-handler because message does not contain stateModelDef. msgId: " + message.getId());
       return null;
     }
 
@@ -213,12 +214,12 @@ public class HelixStateMachineEngine implements StateMachineEngine
       factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
     }
 
-    StateModelFactory stateModelFactory =
+    StateModelFactory<? extends StateModel> stateModelFactory =
         getStateModelFactory(stateModelName, factoryName);
     if (stateModelFactory == null)
     {
-      logger.warn("Cannot find stateModelFactory for model:" + stateModelName
-          + " using factoryName:" + factoryName + " for resourceGroup:" + resourceName);
+      logger.warn("Fail to create msg-handler because cannot find stateModelFactory for model: " + stateModelName
+          + " using factoryName: " + factoryName + " for resource: " + resourceName);
       return null;
     }
 
@@ -231,39 +232,48 @@ public class HelixStateMachineEngine implements StateMachineEngine
           accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
       if (stateModelDef == null)
       {
-        throw new HelixException("stateModelDef for " + stateModelName
-            + " does NOT exists");
+        throw new HelixException("fail to create msg-handler because stateModelDef for " + stateModelName
+            + " does NOT exist");
       }
       _stateModelDefs.put(stateModelName, stateModelDef);
     }
 
-    // create currentStateDelta for this partition
-    String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
-    StateModel stateModel = stateModelFactory.getStateModel(partitionKey);
-    if (stateModel == null)
-    {
-      stateModelFactory.createAndAddStateModel(partitionKey);
-      stateModel = stateModelFactory.getStateModel(partitionKey);
-      stateModel.updateState(initState);
-    }
+    if (message.getBatchMessageMode() == false) {
+        // create currentStateDelta for this partition
+        String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
+        StateModel stateModel = stateModelFactory.getStateModel(partitionKey);
+        if (stateModel == null)
+        {
+          stateModel = stateModelFactory.createAndAddStateModel(partitionKey);
+          stateModel.updateState(initState);
+        }
 
-    CurrentState currentStateDelta = new CurrentState(resourceName);
-    currentStateDelta.setSessionId(sessionId);
-    currentStateDelta.setStateModelDefRef(stateModelName);
-    currentStateDelta.setStateModelFactoryName(factoryName);
-    currentStateDelta.setBucketSize(bucketSize);
-
-    currentStateDelta.setState(partitionKey, (stateModel.getCurrentState() == null)
-        ? initState : stateModel.getCurrentState());
-
-    HelixTaskExecutor executor = (HelixTaskExecutor) context.get(NotificationContext.TASK_EXECUTOR_KEY);
-    
-    return new HelixStateTransitionHandler(stateModel,
-                                           message,
-                                           context,
-                                           currentStateDelta,
-                                           executor);
-  }
+        // TODO: move currentStateDelta to StateTransitionMsgHandler
+        CurrentState currentStateDelta = new CurrentState(resourceName);
+        currentStateDelta.setSessionId(sessionId);
+        currentStateDelta.setStateModelDefRef(stateModelName);
+        currentStateDelta.setStateModelFactoryName(factoryName);
+        currentStateDelta.setBucketSize(bucketSize);
+
+        currentStateDelta.setState(partitionKey, (stateModel.getCurrentState() == null)
+            ? initState : stateModel.getCurrentState());
+
+        return new HelixStateTransitionHandler(stateModel,
+                                               message,
+                                               context,
+                                               currentStateDelta);
+    } else
+    {    	
+    	// get executor-service for the message
+    	TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString());
+    	if (executor == null)
+    	{
+    		logger.error("fail to get executor-service for batch message: " + message.getId() 
+    				+ ". msgType: " + message.getMsgType() + ", resource: " + message.getResourceName());
+    		return null;
+    	}
+    	return new BatchMessageHandler(message, context, this, executor);
+    }  }
 
   @Override
   public String getMessageType()

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
index 821127c..8207052 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
@@ -51,9 +51,11 @@ public abstract class StateModelFactory<T extends StateModel>
    * 
    * @param partitionName
    */
-  public void createAndAddStateModel(String partitionName)
+  public T createAndAddStateModel(String partitionName)
   {
-    _stateModelMap.put(partitionName, createNewStateModel(partitionName));
+		T stateModel = createNewStateModel(partitionName);
+	    _stateModelMap.put(partitionName, stateModel);
+	    return stateModel;
   }
 
   /**