You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/02/05 22:06:14 UTC
[2/3] git commit: HELIX-42: refactor batch message handling
HELIX-42: refactor batch message handling
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/a1bf1244
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/a1bf1244
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/a1bf1244
Branch: refs/heads/master
Commit: a1bf1244f18a0b43bc53aabe702a03516df72fc0
Parents: 1ec3ac6
Author: zzhang <zz...@uci.edu>
Authored: Tue Feb 5 13:06:04 2013 -0800
Committer: zzhang <zz...@uci.edu>
Committed: Tue Feb 5 13:06:04 2013 -0800
----------------------------------------------------------------------
.../main/java/org/apache/helix/HelixProperty.java | 10 +-
.../java/org/apache/helix/NotificationContext.java | 8 +-
.../stages/CurrentStateComputationStage.java | 2 +-
.../stages/ResourceComputationStage.java | 4 +-
.../controller/stages/TaskAssignmentStage.java | 2 +-
.../apache/helix/manager/zk/ZKHelixManager.java | 2 +-
.../messaging/handling/BatchMessageHandler.java | 228 +++++++++
.../messaging/handling/CurrentStateUpdate.java | 40 ++
.../messaging/handling/HelixBatchMessageTask.java | 121 +++++
.../handling/HelixStateTransitionHandler.java | 73 ++--
.../apache/helix/messaging/handling/HelixTask.java | 241 ++++------
.../messaging/handling/HelixTaskExecutor.java | 364 ++++++++-------
.../helix/messaging/handling/MessageTask.java | 36 ++
.../helix/messaging/handling/MessageTaskInfo.java | 41 ++
.../messaging/handling/MessageTimeoutTask.java | 50 ++
.../helix/messaging/handling/TaskExecutor.java | 92 ++++
.../helix/participant/HelixStateMachineEngine.java | 104 +++--
.../statemachine/StateModelFactory.java | 6 +-
.../src/test/java/org/apache/helix/Mocks.java | 9 +-
.../org/apache/helix/TestHelixTaskExecutor.java | 16 +-
.../org/apache/helix/TestHelixTaskHandler.java | 4 +-
.../apache/helix/integration/TestBatchMessage.java | 298 ++++++++++++
.../apache/helix/integration/TestGroupMessage.java | 213 ---------
.../handling/TestConfigThreadpoolSize.java | 4 +-
.../messaging/handling/TestHelixTaskExecutor.java | 41 ++-
.../handling/TestResourceThreadpoolSize.java | 2 +-
26 files changed, 1380 insertions(+), 631 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/HelixProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index 080ee58..31ac50a 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -35,7 +35,7 @@ public class HelixProperty
public enum HelixPropertyAttribute
{
BUCKET_SIZE,
- GROUP_MESSAGE_MODE
+ BATCH_MESSAGE_MODE
}
protected final ZNRecord _record;
@@ -178,16 +178,16 @@ public class HelixProperty
return records;
}
- public void setGroupMessageMode(boolean enable)
+ public void setBatchMessageMode(boolean enable)
{
- _record.setSimpleField(HelixPropertyAttribute.GROUP_MESSAGE_MODE.toString(), ""
+ _record.setSimpleField(HelixPropertyAttribute.BATCH_MESSAGE_MODE.toString(), ""
+ enable);
}
- public boolean getGroupMessageMode()
+ public boolean getBatchMessageMode()
{
String enableStr =
- _record.getSimpleField(HelixPropertyAttribute.GROUP_MESSAGE_MODE.toString());
+ _record.getSimpleField(HelixPropertyAttribute.BATCH_MESSAGE_MODE.toString());
if (enableStr == null)
{
return false;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/NotificationContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/NotificationContext.java b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
index 0ddc15c..f683e77 100644
--- a/helix-core/src/main/java/org/apache/helix/NotificationContext.java
+++ b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
@@ -24,8 +24,12 @@ import java.util.Map;
public class NotificationContext
{
- // keys used for object map
- public static final String TASK_EXECUTOR_KEY = "TASK_EXECUTOR";
+ // keys used for object map
+ public enum MapKey {
+ TASK_EXECUTOR,
+ CURRENT_STATE_UPDATE,
+ HELIX_TASK_RESULT
+ }
private Map<String, Object> _map;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index be501dd..6612ee0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -79,7 +79,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage
continue;
}
- if (!message.getGroupMessageMode())
+ if (!message.getBatchMessageMode())
{
String partitionName = message.getPartitionName();
Partition partition = resource.getPartition(partitionName);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 663eb2a..00ba8eb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -71,7 +71,7 @@ public class ResourceComputationStage extends AbstractBaseStage
resource.setStateModelDefRef(idealState.getStateModelDefRef());
resource.setStateModelFactoryName(idealState.getStateModelFactoryName());
resource.setBucketSize(idealState.getBucketSize());
- resource.setGroupMessageMode(idealState.getGroupMessageMode());
+ resource.setGroupMessageMode(idealState.getBatchMessageMode());
}
}
}
@@ -107,7 +107,7 @@ public class ResourceComputationStage extends AbstractBaseStage
resource.setStateModelDefRef(currentState.getStateModelDefRef());
resource.setStateModelFactoryName(currentState.getStateModelFactoryName());
resource.setBucketSize(currentState.getBucketSize());
- resource.setGroupMessageMode(currentState.getGroupMessageMode());
+ resource.setGroupMessageMode(currentState.getBatchMessageMode());
}
if (currentState.getStateModelDefRef() == null)
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 822fcb8..0a6c250 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -110,7 +110,7 @@ public class TaskAssignmentStage extends AbstractBaseStage
if (!groupMessages.containsKey(key))
{
Message groupMessage = new Message(message.getRecord());
- groupMessage.setGroupMessageMode(true);
+ groupMessage.setBatchMessageMode(true);
outputMessages.add(groupMessage);
groupMessages.put(key, groupMessage);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 1ba8985..3d13cd0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -456,7 +456,7 @@ public class ZKHelixManager implements HelixManager
* shutdown thread pool first to avoid reset() being invoked in the middle of state
* transition
*/
- _messagingService.getExecutor().shutDown();
+ _messagingService.getExecutor().shutdown();
resetHandlers();
_helixAccessor.shutdown();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
new file mode 100644
index 0000000..7ae0c38
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
@@ -0,0 +1,228 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.MapKey;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.log4j.Logger;
+
+public class BatchMessageHandler extends MessageHandler {
+ private static Logger LOG = Logger.getLogger(BatchMessageHandler.class);
+
+ final MessageHandlerFactory _msgHandlerFty;
+ final TaskExecutor _executor;
+ final List<Message> _subMessages;
+ final List<MessageHandler> _subMessageHandlers;
+
+ public BatchMessageHandler(Message msg, NotificationContext context, MessageHandlerFactory fty,
+ TaskExecutor executor) {
+ super(msg, context);
+
+ if (fty == null || executor == null) {
+ throw new HelixException("MessageHandlerFactory | TaskExecutor can't be null");
+ }
+
+ _msgHandlerFty = fty;
+ _executor = executor;
+
+ // create sub-messages
+ _subMessages = new ArrayList<Message>();
+ List<String> partitionKeys = _message.getPartitionNames();
+ for (String partitionKey : partitionKeys) {
+ // assign a new message id, put batch-msg-id to parent-id field
+ Message subMsg = new Message(_message.getRecord(), UUID.randomUUID().toString());
+ subMsg.setPartitionName(partitionKey);
+ subMsg.setAttribute(Attributes.PARENT_MSG_ID, _message.getId());
+ subMsg.setBatchMessageMode(false);
+
+ _subMessages.add(subMsg);
+ }
+
+ // create sub-message handlers
+ _subMessageHandlers = createMsgHandlers(_subMessages, context);
+ }
+
+ List<MessageHandler> createMsgHandlers(List<Message> msgs, NotificationContext context) {
+
+ List<MessageHandler> handlers = new ArrayList<MessageHandler>();
+ for (Message msg : msgs) {
+ MessageHandler handler = _msgHandlerFty.createHandler(msg, context);
+ handlers.add(handler);
+ }
+ return handlers;
+ }
+
+
+ public void preHandleMessage() {
+ // TODO add batch-message-handler.start() here
+ }
+
+ public void postHandleMessage() {
+ // TODO add batch-message-handler.end() here
+
+ // update currentState
+ HelixManager manager = _notificationContext.getManager();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap = (ConcurrentHashMap<String, CurrentStateUpdate>) _notificationContext
+ .get(MapKey.CURRENT_STATE_UPDATE.toString());
+
+ if (csUpdateMap != null) {
+ Map<PropertyKey, CurrentState> csUpdate = mergeCurStateUpdate(csUpdateMap);
+
+ // TODO: change to use asyncSet
+ for (PropertyKey key : csUpdate.keySet()) {
+ // logger.info("updateCS: " + key);
+ // System.out.println("\tupdateCS: " + key.getPath() + ", " +
+ // curStateMap.get(key));
+ accessor.updateProperty(key, csUpdate.get(key));
+ }
+ }
+ }
+
+ // will not return until all sub-message executions are done
+ @Override
+ public HelixTaskResult handleMessage() {
+ HelixTaskResult result = null;
+ List<Future<HelixTaskResult>> futures = null;
+ List<MessageTask> batchTasks = new ArrayList<MessageTask>();
+
+ // TODO sync on resource level
+ {
+ try {
+ preHandleMessage();
+
+ int exeBatchSize = 1; // TODO: getExeBatchSize from msg
+ List<String> partitionKeys = _message.getPartitionNames();
+ for (int i = 0; i < partitionKeys.size(); i += exeBatchSize) {
+ if (i + exeBatchSize <= partitionKeys.size()) {
+ List<Message> msgs = _subMessages.subList(i, i + exeBatchSize);
+ List<MessageHandler> handlers = _subMessageHandlers.subList(i, i + exeBatchSize);
+ HelixBatchMessageTask batchTask = new HelixBatchMessageTask(_message, msgs, handlers, _notificationContext);
+ batchTasks.add(batchTask);
+
+ } else {
+ List<Message> msgs = _subMessages.subList(i, i + partitionKeys.size());
+ List<MessageHandler> handlers = _subMessageHandlers.subList(i, i + partitionKeys.size());
+
+ HelixBatchMessageTask batchTask = new HelixBatchMessageTask(_message, msgs, handlers, _notificationContext);
+ batchTasks.add(batchTask);
+ }
+ }
+
+ // invokeAll() is blocking call
+ long timeout = _message.getExecutionTimeout();
+ if (timeout == -1) {
+ timeout = Long.MAX_VALUE;
+ }
+ futures = _executor.invokeAllTasks(batchTasks, timeout, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ LOG.error("fail to execute batchMsg: " + _message.getId(), e);
+ result = new HelixTaskResult();
+ result.setException(e);
+
+ // HelixTask will call onError on this batch-msg-handler
+ // return result;
+ }
+
+
+ // combine sub-results to result
+ if (futures != null) {
+ boolean isBatchTaskSucceed = true;
+
+ for (int i = 0; i < futures.size(); i++) {
+ Future<HelixTaskResult> future = futures.get(i);
+ MessageTask subTask = batchTasks.get(i);
+ try {
+ HelixTaskResult subTaskResult = future.get();
+ if (!subTaskResult.isSucess()) {
+ isBatchTaskSucceed = false;
+ }
+ } catch (InterruptedException e) {
+ isBatchTaskSucceed = false;
+ LOG.error("interrupted in executing batch-msg: " + _message.getId() + ", sub-msg: " + subTask.getTaskId(), e);
+ } catch (ExecutionException e) {
+ isBatchTaskSucceed = false;
+ LOG.error("fail to execute batch-msg: " + _message.getId() + ", sub-msg: " + subTask.getTaskId(), e);
+ }
+ }
+ result = new HelixTaskResult();
+ result.setSuccess(isBatchTaskSucceed);
+ }
+
+ // pass task-result to post-handle-msg
+ _notificationContext.add(MapKey.HELIX_TASK_RESULT.toString(), result);
+ postHandleMessage();
+
+ return result;
+ }
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
+ // if one sub-message execution fails, call onError on all sub-message handlers
+ for (MessageHandler handler : _subMessageHandlers) {
+ handler.onError(e, code, type);
+ }
+ }
+
+ // TODO: optimize this based on the fact that each cs update is for a
+ // distinct partition
+ private Map<PropertyKey, CurrentState> mergeCurStateUpdate(
+ ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap) {
+ Map<String, CurrentStateUpdate> curStateUpdateMap = new HashMap<String, CurrentStateUpdate>();
+ for (CurrentStateUpdate update : csUpdateMap.values()) {
+ String path = update._key.getPath(); // TODO: this is time
+ // consuming, optimize it
+ if (!curStateUpdateMap.containsKey(path)) {
+ curStateUpdateMap.put(path, update);
+ } else {
+ // long start = System.currentTimeMillis();
+ curStateUpdateMap.get(path).merge(update._delta);
+ // long end = System.currentTimeMillis();
+ // LOG.info("each merge took: " + (end - start));
+ }
+ }
+
+ Map<PropertyKey, CurrentState> ret = new HashMap<PropertyKey, CurrentState>();
+ for (CurrentStateUpdate update : curStateUpdateMap.values()) {
+ ret.put(update._key, update._delta);
+ }
+
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java
new file mode 100644
index 0000000..ecce683
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/CurrentStateUpdate.java
@@ -0,0 +1,40 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+
+public class CurrentStateUpdate {
+ final PropertyKey _key;
+ final CurrentState _delta;
+
+ CurrentStateUpdate(PropertyKey key, CurrentState delta)
+ {
+ _key = key;
+ _delta = delta;
+ }
+
+ public void merge(CurrentState anotherDelta)
+ {
+ _delta.getRecord().merge(anotherDelta.getRecord());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
new file mode 100644
index 0000000..f4c82b3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
@@ -0,0 +1,121 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+public class HelixBatchMessageTask implements MessageTask {
+ private static Logger LOG = Logger.getLogger(HelixBatchMessageTask.class);
+
+ final NotificationContext _context;
+ final Message _batchMsg;
+ final List<Message> _subMsgs;
+ final List<MessageHandler> _handlers;
+
+ public HelixBatchMessageTask(Message batchMsg, List<Message> subMsgs, List<MessageHandler> handlers,
+ NotificationContext context) {
+ _batchMsg = batchMsg;
+ _context = context;
+ _subMsgs = subMsgs;
+ _handlers = handlers;
+ }
+
+ @Override
+ public HelixTaskResult call() throws Exception {
+ HelixTaskResult taskResult = null;
+
+ long start = System.currentTimeMillis();
+ LOG.info("taskId:" + getTaskId() + " handling task begin, at: " + start);
+
+ boolean isSucceed = true;
+ try
+ {
+ for (MessageHandler handler : _handlers) {
+ if (handler != null) {
+ HelixTaskResult subTaskResult = handler.handleMessage();
+ // if any fails, return false
+ if (!subTaskResult.isSucess()) {
+ // System.err.println("\t[dbg]error handling message: " + handler._message);
+ isSucceed = false;
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ String errorMessage =
+ "Exception while executing task: " + getTaskId();
+ LOG.error(errorMessage, e);
+
+ taskResult = new HelixTaskResult();
+ taskResult.setException(e);
+ taskResult.setMessage(e.getMessage());
+
+ return taskResult;
+ }
+
+ if (isSucceed) {
+ LOG.info("task: " + getTaskId() + " completed sucessfully");
+ }
+
+ taskResult = new HelixTaskResult();
+ taskResult.setSuccess(isSucceed);
+ return taskResult;
+ }
+
+ @Override
+ public String getTaskId() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(_batchMsg.getId());
+ sb.append("/");
+ List<String> msgIdList = new ArrayList<String>();
+ if (_subMsgs != null) {
+ for (Message msg : _subMsgs) {
+ msgIdList.add(msg.getId());
+ }
+ }
+ sb.append(msgIdList);
+ return sb.toString();
+ }
+
+ @Override
+ public Message getMessage() {
+ return _batchMsg;
+ }
+
+ @Override
+ public NotificationContext getNotificationContext() {
+ return _context;
+ }
+
+ @Override
+ public void onTimeout() {
+ for (MessageHandler handler : _handlers) {
+ if (handler != null) {
+ handler.onTimeout();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 4616123..20b2ff1 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -25,11 +25,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecordBucketizer;
import org.apache.helix.ZNRecordDelta;
@@ -37,6 +39,7 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecordDelta.MergeOperation;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelParser;
import org.apache.helix.participant.statemachine.StateTransitionError;
@@ -60,25 +63,24 @@ public class HelixStateTransitionHandler extends MessageHandler
private final StateModelParser _transitionMethodFinder;
private final CurrentState _currentStateDelta;
volatile boolean _isTimeout = false;
- private final HelixTaskExecutor _executor;
public HelixStateTransitionHandler(StateModel stateModel,
Message message,
NotificationContext context,
- CurrentState currentStateDelta,
- HelixTaskExecutor executor)
+ CurrentState currentStateDelta)
{
super(message, context);
_stateModel = stateModel;
_statusUpdateUtil = new StatusUpdateUtil();
_transitionMethodFinder = new StateModelParser();
_currentStateDelta = currentStateDelta;
- _executor = executor;
}
- private void prepareMessageExecution(HelixManager manager, Message message) throws HelixException,
- HelixStateMismatchException
+ void preHandleMessage() throws Exception
{
+ Message message = _message;
+ HelixManager manager = _notificationContext.getManager();
+
if (!message.isValid())
{
String errorMessage =
@@ -93,7 +95,7 @@ public class HelixStateTransitionHandler extends MessageHandler
logger.error(errorMessage);
throw new HelixException(errorMessage);
}
- // DataAccessor accessor = manager.getDataAccessor();
+
HelixDataAccessor accessor = manager.getHelixDataAccessor();
String partitionName = message.getPartitionName();
@@ -119,12 +121,13 @@ public class HelixStateTransitionHandler extends MessageHandler
}
}
- void postExecutionMessage(HelixManager manager,
- Message message,
- NotificationContext context,
- HelixTaskResult taskResult,
- Exception exception)
+ void postHandleMessage()
{
+ Message message = _message;
+ HelixManager manager = _notificationContext.getManager();
+ HelixTaskResult taskResult = (HelixTaskResult) _notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
+ Exception exception = taskResult.getException();
+
String partitionKey = message.getPartitionName();
String resource = message.getResourceName();
String sessionId = message.getTgtSessionId();
@@ -209,7 +212,7 @@ public class HelixStateTransitionHandler extends MessageHandler
return;
}
}
- _stateModel.rollbackOnError(message, context, error);
+ _stateModel.rollbackOnError(message, _notificationContext, error);
_currentStateDelta.setState(partitionKey, "ERROR");
_stateModel.updateState("ERROR");
}
@@ -222,32 +225,39 @@ public class HelixStateTransitionHandler extends MessageHandler
sessionId,
resource,
bucketizer.getBucketName(partitionKey));
- if (!_message.getGroupMessageMode())
+ if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null)
{
+ // normal message
accessor.updateProperty(key, _currentStateDelta);
}
else
{
- _executor._groupMsgHandler.addCurStateUpdate(_message, key, _currentStateDelta);
+ // sub-message of a batch message
+ ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap
+ = (ConcurrentHashMap<String, CurrentStateUpdate>) _notificationContext.get(MapKey.CURRENT_STATE_UPDATE.toString());
+ csUpdateMap.put(partitionKey, new CurrentStateUpdate(key, _currentStateDelta));
}
}
catch (Exception e)
{
- logger.error("Error when updating the state ", e);
+ logger.error("Error when updating current-state ", e);
StateTransitionError error =
new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
- _stateModel.rollbackOnError(message, context, error);
+ _stateModel.rollbackOnError(message, _notificationContext, error);
_statusUpdateUtil.logError(message,
HelixStateTransitionHandler.class,
e,
- "Error when update the state ",
+ "Error when update current-state ",
accessor);
}
}
- public HelixTaskResult handleMessageInternal(Message message,
- NotificationContext context)
+ @Override
+ public HelixTaskResult handleMessage()
{
+ NotificationContext context = _notificationContext;
+ Message message = _message;
+
synchronized (_stateModel)
{
HelixTaskResult taskResult = new HelixTaskResult();
@@ -260,10 +270,11 @@ public class HelixStateTransitionHandler extends MessageHandler
accessor);
message.setExecuteStartTimeStamp(new Date().getTime());
- Exception exception = null;
+ // Exception exception = null;
try
{
- prepareMessageExecution(manager, message);
+ preHandleMessage();
+ // prepareMessageExecution(manager, message);
invoke(accessor, context, taskResult, message);
}
catch (HelixStateMismatchException e)
@@ -273,7 +284,7 @@ public class HelixStateTransitionHandler extends MessageHandler
taskResult.setSuccess(false);
taskResult.setMessage(e.toString());
taskResult.setException(e);
- exception = e;
+ // exception = e;
// return taskResult;
}
catch (Exception e)
@@ -295,9 +306,13 @@ public class HelixStateTransitionHandler extends MessageHandler
taskResult.setMessage(e.toString());
taskResult.setException(e);
taskResult.setInterrupted(e instanceof InterruptedException);
- exception = e;
+ // exception = e;
}
- postExecutionMessage(manager, message, context, taskResult, exception);
+// postExecutionMessage(manager, message, context, taskResult, exception);
+
+ // add task result to context for postHandling
+ context.add(MapKey.HELIX_TASK_RESULT.toString(), taskResult);
+ postHandleMessage();
return taskResult;
}
@@ -346,18 +361,12 @@ public class HelixStateTransitionHandler extends MessageHandler
}
@Override
- public HelixTaskResult handleMessage()
- {
- return handleMessageInternal(_message, _notificationContext);
- }
-
- @Override
public void onError(Exception e, ErrorCode code, ErrorType type)
{
// All internal error has been processed already, so we can skip them
if (type == ErrorType.INTERNAL)
{
- logger.error("Skip internal error " + e.getMessage() + " " + code);
+ logger.error("Skip internal error. errCode: " + code + ", errMsg: " + e.getMessage());
return;
}
HelixManager manager = _notificationContext.getManager();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index fd0bd8d..6b5a66e 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -26,11 +26,13 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.messaging.handling.GroupMessageHandler.GroupMessageInfo;
@@ -38,6 +40,7 @@ import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.monitoring.StateTransitionContext;
import org.apache.helix.monitoring.StateTransitionDataPoint;
@@ -45,7 +48,7 @@ import org.apache.helix.util.StatusUpdateUtil;
import org.apache.log4j.Logger;
-public class HelixTask implements Callable<HelixTaskResult>
+public class HelixTask implements MessageTask
{
private static Logger logger = Logger.getLogger(HelixTask.class);
private final Message _message;
@@ -56,37 +59,10 @@ public class HelixTask implements Callable<HelixTaskResult>
HelixTaskExecutor _executor;
volatile boolean _isTimeout = false;
- public class TimeoutCancelTask extends TimerTask
- {
- HelixTaskExecutor _executor;
- Message _message;
- NotificationContext _context;
-
- public TimeoutCancelTask(HelixTaskExecutor executor,
- Message message,
- NotificationContext context)
- {
- _executor = executor;
- _message = message;
- _context = context;
- }
-
- @Override
- public void run()
- {
- _isTimeout = true;
- logger.warn("Message time out, canceling. id:" + _message.getMsgId()
- + " timeout : " + _message.getExecutionTimeout());
- _handler.onTimeout();
- _executor.cancelTask(_message, _context);
- }
-
- }
-
public HelixTask(Message message,
NotificationContext notificationContext,
MessageHandler handler,
- HelixTaskExecutor executor) throws Exception
+ HelixTaskExecutor executor)
{
this._notificationContext = notificationContext;
this._message = message;
@@ -99,30 +75,13 @@ public class HelixTask implements Callable<HelixTaskResult>
@Override
public HelixTaskResult call()
{
- // Start the timeout TimerTask, if necessary
- Timer timer = null;
- if (_message.getExecutionTimeout() > 0)
- {
- timer = new Timer(true);
- timer.schedule(new TimeoutCancelTask(_executor, _message, _notificationContext),
- _message.getExecutionTimeout());
- logger.info("Message starts with timeout " + _message.getExecutionTimeout()
- + " MsgId:" + _message.getMsgId());
- }
- else
- {
- logger.info("Message does not have timeout. MsgId:" + _message.getMsgId() + "/"
- + _message.getPartitionName());
- }
+ HelixTaskResult taskResult = null;
- HelixTaskResult taskResult = new HelixTaskResult();
-
- Exception exception = null;
- ErrorType type = ErrorType.INTERNAL;
- ErrorCode code = ErrorCode.ERROR;
+ ErrorType type = null;
+ ErrorCode code = null;
long start = System.currentTimeMillis();
- logger.info("msg:" + _message.getMsgId() + " handling task begin, at: " + start);
+ logger.info("handling task: " + getTaskId() + " begin, at: " + start);
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
_statusUpdateUtil.logInfo(_message,
HelixTask.class,
@@ -130,147 +89,133 @@ public class HelixTask implements Callable<HelixTaskResult>
accessor);
_message.setExecuteStartTimeStamp(new Date().getTime());
+ // add a concurrent map to hold currentStateUpdates for sub-messages of a batch-message
+ // partitionName -> csUpdate
+ if (_message.getBatchMessageMode() == true) {
+ _notificationContext.add(MapKey.CURRENT_STATE_UPDATE.toString(),
+ new ConcurrentHashMap<String, CurrentStateUpdate>());
+ }
+
// Handle the message
try
{
taskResult = _handler.handleMessage();
- exception = taskResult.getException();
}
catch (InterruptedException e)
{
+ taskResult = new HelixTaskResult();
+ taskResult.setException(e);
+ taskResult.setInterrupted(true);
+
_statusUpdateUtil.logError(_message,
HelixTask.class,
e,
"State transition interrupted, timeout:" + _isTimeout,
accessor);
logger.info("Message " + _message.getMsgId() + " is interrupted");
- taskResult.setInterrupted(true);
- taskResult.setException(e);
- exception = e;
}
catch (Exception e)
{
+ taskResult = new HelixTaskResult();
+ taskResult.setException(e);
+ taskResult.setMessage(e.getMessage());
+
String errorMessage =
"Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
+ " type: " + _message.getMsgType();
logger.error(errorMessage, e);
_statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
- taskResult.setSuccess(false);
- taskResult.setException(e);
- taskResult.setMessage(e.getMessage());
- exception = e;
}
- // Cancel the timer since the handling is done
- // it is fine if the TimerTask for canceling is called already
- if (timer != null)
- {
- timer.cancel();
- }
-
- if (taskResult.isSucess())
+ // cancel timeout task
+ _executor.cancelTimeoutTask(this);
+
+ Exception exception = null;
+ try
{
- _statusUpdateUtil.logInfo(_message,
+ if (taskResult.isSucess())
+ {
+ _statusUpdateUtil.logInfo(_message,
_handler.getClass(),
"Message handling task completed successfully",
accessor);
- logger.info("Message " + _message.getMsgId() + " completed.");
- }
- else if (taskResult.isInterrupted())
- {
- logger.info("Message " + _message.getMsgId() + " is interrupted");
- code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
- if (_isTimeout)
- {
- int retryCount = _message.getRetryCount();
- logger.info("Message timeout, retry count: " + retryCount + " MSGID:"
- + _message.getMsgId());
- _statusUpdateUtil.logInfo(_message,
+ logger.info("Message " + _message.getMsgId() + " completed.");
+ }
+ else {
+ type = ErrorType.INTERNAL;
+
+ if (taskResult.isInterrupted())
+ {
+ logger.info("Message " + _message.getMsgId() + " is interrupted");
+ code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
+ if (_isTimeout)
+ {
+ int retryCount = _message.getRetryCount();
+ logger.info("Message timeout, retry count: " + retryCount + " msgId:"
+ + _message.getMsgId());
+ _statusUpdateUtil.logInfo(_message,
_handler.getClass(),
"Message handling task timeout, retryCount:"
+ retryCount,
accessor);
- // Notify the handler that timeout happens, and the number of retries left
- // In case timeout happens (time out and also interrupted)
- // we should retry the execution of the message by re-schedule it in
- if (retryCount > 0)
- {
- _message.setRetryCount(retryCount - 1);
- _executor.scheduleTask(_message, _handler, _notificationContext);
- return taskResult;
- }
- }
- }
- else
- // logging for errors
- {
- String errorMsg =
- "Message execution failed. msgId: " + _message.getMsgId()
- + taskResult.getMessage();
- if (exception != null)
- {
- errorMsg += exception;
- }
- logger.error(errorMsg, exception);
- _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
- }
-
- // Post-processing for the finished task
- try
- {
- if (!_message.getGroupMessageMode())
- {
- removeMessageFromZk(accessor, _message);
- reportMessageStat(_manager, _message, taskResult);
- sendReply(accessor, _message, taskResult);
- }
- else
- {
- GroupMessageInfo info = _executor._groupMsgHandler.onCompleteSubMessage(_message);
- if (info != null)
- {
- // TODO: changed to async update
- // group update current state
- Map<PropertyKey, CurrentState> curStateMap = info.merge();
- for (PropertyKey key : curStateMap.keySet())
- {
- accessor.updateProperty(key, curStateMap.get(key));
+ // Notify the handler that timeout happens, and the number of retries left
+ // In case timeout happens (time out and also interrupted)
+ // we should retry the execution of the message by re-schedule it in
+ if (retryCount > 0)
+ {
+ _message.setRetryCount(retryCount - 1);
+ HelixTask task = new HelixTask(_message, _notificationContext, _handler, _executor);
+ _executor.scheduleTask(task);
+ return taskResult;
+ }
+ }
}
-
- // remove group message
+ else // logging for errors
+ {
+ code = ErrorCode.ERROR;
+ String errorMsg =
+ "Message execution failed. msgId: " + getTaskId()
+ + ", errorMsg: " + taskResult.getMessage();
+ logger.error(errorMsg);
+ _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
+ }
+ }
+
+ if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
+ // System.err.println("\t[dbg]remove msg: " + getTaskId());
removeMessageFromZk(accessor, _message);
reportMessageStat(_manager, _message, taskResult);
sendReply(accessor, _message, taskResult);
- }
+ _executor.finishTask(this);
}
- _executor.reportCompletion(_message);
}
-
- // TODO: capture errors and log here
catch (Exception e)
{
+ exception = e;
+ type = ErrorType.FRAMEWORK;
+ code = ErrorCode.ERROR;
+
String errorMessage =
"Exception after executing a message, msgId: " + _message.getMsgId() + e;
logger.error(errorMessage, e);
_statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
- exception = e;
- type = ErrorType.FRAMEWORK;
- code = ErrorCode.ERROR;
}
- //
finally
{
long end = System.currentTimeMillis();
- logger.info("msg:" + _message.getMsgId() + " handling task completed, results:"
+ logger.info("msg: " + _message.getMsgId() + " handling task completed, results:"
+ taskResult.isSucess() + ", at: " + end + ", took:" + (end - start));
// Notify the handler about any error happened in the handling procedure, so that
// the handler have chance to finally cleanup
- if (exception != null)
+ if (type == ErrorType.INTERNAL)
{
- _handler.onError(exception, code, type);
+ _handler.onError(taskResult.getException(), code, type);
+ } else if (type == ErrorType.FRAMEWORK) {
+ _handler.onError(exception, code, type);
}
}
+
return taskResult;
}
@@ -370,4 +315,26 @@ public class HelixTask implements Callable<HelixTaskResult>
}
}
+ @Override
+ public String getTaskId()
+ {
+ return _message.getId();
+ }
+
+ @Override
+ public Message getMessage() {
+ return _message;
+ }
+
+ @Override
+ public NotificationContext getNotificationContext()
+ {
+ return _notificationContext;
+ }
+
+ @Override
+ public void onTimeout() {
+ _isTimeout = true;
+ _handler.onTimeout();
+ }
};
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index cf7ebf8..c674d69 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -28,6 +28,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -46,6 +48,7 @@ import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.MessageListener;
import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.NotificationContext.Type;
@@ -61,14 +64,14 @@ import org.apache.helix.util.StatusUpdateUtil;
import org.apache.log4j.Logger;
-public class HelixTaskExecutor implements MessageListener
+public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
// TODO: we need to further design how to throttle this.
// From storage point of view, only bootstrap case is expensive
// and we need to throttle, which is mostly IO / network bounded.
public static final int DEFAULT_PARALLEL_TASKS = 40;
// TODO: create per-task type threadpool with customizable pool size
- protected final Map<String, Future<HelixTaskResult>> _taskMap;
+ protected final Map<String, MessageTaskInfo> _taskMap;
private final Object _lock;
private final StatusUpdateUtil _statusUpdateUtil;
private final ParticipantMonitor _monitor;
@@ -78,8 +81,7 @@ public class HelixTaskExecutor implements MessageListener
final ConcurrentHashMap<String, MessageHandlerFactory> _handlerFactoryMap =
new ConcurrentHashMap<String, MessageHandlerFactory>();
- final ConcurrentHashMap<String, ExecutorService> _threadpoolMap =
- new ConcurrentHashMap<String, ExecutorService>();
+ final ConcurrentHashMap<String, ExecutorService> _executorMap;
private static Logger LOG =
Logger.getLogger(HelixTaskExecutor.class);
@@ -87,24 +89,31 @@ public class HelixTaskExecutor implements MessageListener
Map<String, Integer> _resourceThreadpoolSizeMap =
new ConcurrentHashMap<String, Integer>();
- final GroupMessageHandler _groupMsgHandler;
+ // timer for schedule timeout tasks
+ final Timer _timer;
+
public HelixTaskExecutor()
{
- _taskMap = new ConcurrentHashMap<String, Future<HelixTaskResult>>();
- _groupMsgHandler = new GroupMessageHandler();
+ _taskMap = new ConcurrentHashMap<String, MessageTaskInfo>();
+ _executorMap = new ConcurrentHashMap<String, ExecutorService>();
_lock = new Object();
_statusUpdateUtil = new StatusUpdateUtil();
_monitor = new ParticipantMonitor();
+
+ _timer = new Timer(true); // created as a daemon timer thread to handle task timeout
+
startMonitorThread();
}
+ @Override
public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
{
registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS);
}
+ @Override
public void registerMessageHandlerFactory(String type,
MessageHandlerFactory factory,
int threadpoolSize)
@@ -118,13 +127,16 @@ public class HelixTaskExecutor implements MessageListener
}
_handlerFactoryMap.put(type, factory);
- _threadpoolMap.put(type, Executors.newFixedThreadPool(threadpoolSize));
- LOG.info("Adding msg factory for type " + type + " threadpool size "
+ ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize);
+ _executorMap.put(type, executorSvc);
+
+ LOG.info("Added msg-factory for type: " + type + ", threadpool size "
+ threadpoolSize);
}
else
{
- LOG.error("Ignoring duplicate msg handler factory for type " + type);
+ LOG.warn("Fail to register msg-handler-factory for type: " + type
+ + ", pool-size: " + threadpoolSize + ", factory: " + factory);
}
}
@@ -167,9 +179,9 @@ public class HelixTaskExecutor implements MessageListener
if (threadpoolSize > 0)
{
String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName;
- _threadpoolMap.put(key, Executors.newFixedThreadPool(threadpoolSize));
- LOG.info("Adding per resource threadpool for resource " + resourceName
- + " with size " + threadpoolSize);
+ _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize));
+ LOG.info("Added per resource threadpool for resource: " + resourceName
+ + " with size: " + threadpoolSize);
}
_resourceThreadpoolSizeMap.put(resourceName, threadpoolSize);
}
@@ -182,38 +194,81 @@ public class HelixTaskExecutor implements MessageListener
**/
ExecutorService findExecutorServiceForMsg(Message message)
{
- ExecutorService executorService = _threadpoolMap.get(message.getMsgType());
+ ExecutorService executorService = _executorMap.get(message.getMsgType());
if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
{
String resourceName = message.getResourceName();
if (resourceName != null)
{
String key = message.getMsgType() + "." + resourceName;
- if (_threadpoolMap.containsKey(key))
+ if (_executorMap.containsKey(key))
{
- LOG.info("Find per-resource thread pool with key " + key);
- executorService = _threadpoolMap.get(key);
+ LOG.info("Find per-resource thread pool with key: " + key);
+ executorService = _executorMap.get(key);
}
}
}
return executorService;
}
- public void scheduleTask(Message message,
- MessageHandler handler,
- NotificationContext notificationContext)
+ // ExecutorService impl's in JDK are thread-safe
+ @Override
+ public List<Future<HelixTaskResult>> invokeAllTasks(List<MessageTask> tasks, long timeout, TimeUnit unit) throws InterruptedException
{
- assert (handler != null);
- synchronized (_lock)
- {
- try
- {
- String taskId = message.getMsgId() + "/" + message.getPartitionName();
+ if (tasks == null || tasks.size() == 0) {
+ return null;
+ }
+
+ // check all tasks use the same executor-service
+ ExecutorService exeSvc = findExecutorServiceForMsg(tasks.get(0).getMessage());
+ for (int i = 1; i < tasks.size(); i++) {
+ MessageTask task = tasks.get(i);
+ ExecutorService curExeSvc = findExecutorServiceForMsg(task.getMessage());
+ if (curExeSvc != exeSvc) {
+ LOG.error("Fail to invoke all tasks because they are not using the same executor-service");
+ return null;
+ }
+ }
+
+ // TODO: check if any of the task has already been scheduled
+
+ // this is a blocking call
+ List<Future<HelixTaskResult>> futures = exeSvc.invokeAll(tasks, timeout, unit);
+
+ return futures;
+ }
+
+ @Override
+ public boolean cancelTimeoutTask(MessageTask task)
+ {
+ synchronized(_lock) {
+ String taskId = task.getTaskId();
+ if (_taskMap.containsKey(taskId)) {
+ MessageTaskInfo info = _taskMap.get(taskId);
+ if (info._timerTask != null) {
+ info._timerTask.cancel();
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public boolean scheduleTask(MessageTask task)
+ {
+ String taskId = task.getTaskId();
+ Message message = task.getMessage();
+ NotificationContext notificationContext = task.getNotificationContext();
+
+ try
+ {
if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
{
checkResourceConfig(message.getResourceName(), notificationContext.getManager());
}
+
LOG.info("Scheduling message: " + taskId);
// System.out.println("sched msg: " + message.getPartitionName() + "-"
// + message.getTgtName() + "-" + message.getFromState() + "-"
@@ -223,28 +278,48 @@ public class HelixTaskExecutor implements MessageListener
HelixTaskExecutor.class,
"Message handling task scheduled",
notificationContext.getManager().getHelixDataAccessor());
-
- HelixTask task = new HelixTask(message, notificationContext, handler, this);
- if (!_taskMap.containsKey(taskId))
+
+ // this sync guarantees that ExecutorService.submit() task and put taskInfo into map are sync'ed
+ synchronized (_lock)
{
- LOG.info("Message:" + taskId + " handling task scheduled");
- Future<HelixTaskResult> future =
- findExecutorServiceForMsg(message).submit(task);
- _taskMap.put(taskId, future);
- }
- else
- {
- _statusUpdateUtil.logWarning(message,
+ if (!_taskMap.containsKey(taskId))
+ {
+ ExecutorService exeSvc = findExecutorServiceForMsg(message);
+ Future<HelixTaskResult> future = exeSvc.submit(task);
+
+ TimerTask timerTask = null;
+ if (message.getExecutionTimeout() > 0)
+ {
+ timerTask = new MessageTimeoutTask(this, task);
+ _timer.schedule(timerTask, message.getExecutionTimeout());
+ LOG.info("Message starts with timeout " + message.getExecutionTimeout()
+ + " MsgId: " + task.getTaskId());
+ }
+ else
+ {
+ LOG.debug("Message does not have timeout. MsgId: " + task.getTaskId());
+ }
+
+ _taskMap.put(taskId, new MessageTaskInfo(task, future, timerTask));
+
+ LOG.info("Message: " + taskId + " handling task scheduled");
+
+ return true;
+ }
+ else
+ {
+ _statusUpdateUtil.logWarning(message,
HelixTaskExecutor.class,
"Message handling task already sheduled for "
+ taskId,
notificationContext.getManager()
.getHelixDataAccessor());
+ }
}
}
catch (Exception e)
{
- LOG.error("Error while executing task." + message, e);
+ LOG.error("Error while executing task. " + message, e);
_statusUpdateUtil.logError(message,
HelixTaskExecutor.class,
@@ -253,67 +328,87 @@ public class HelixTaskExecutor implements MessageListener
notificationContext.getManager()
.getHelixDataAccessor());
}
- }
+ return false;
}
- public void cancelTask(Message message, NotificationContext notificationContext)
+ @Override
+ public boolean cancelTask(MessageTask task)
{
- synchronized (_lock)
- {
- String taskId = message.getMsgId() + "/" + message.getPartitionName();
+ Message message = task.getMessage();
+ NotificationContext notificationContext = task.getNotificationContext();
+ String taskId = task.getTaskId();
- if (_taskMap.containsKey(taskId))
+ synchronized(_lock)
{
- _statusUpdateUtil.logInfo(message,
+ if (_taskMap.containsKey(taskId))
+ {
+ MessageTaskInfo taskInfo = _taskMap.get(taskId);
+ // cancel timeout task
+ if (taskInfo._timerTask != null) {
+ taskInfo._timerTask.cancel();
+ }
+
+ // cancel task
+ Future<HelixTaskResult> future = taskInfo.getFuture();
+
+ _statusUpdateUtil.logInfo(message,
HelixTaskExecutor.class,
- "Trying to cancel the future for " + taskId,
+ "Canceling task: " + taskId,
notificationContext.getManager().getHelixDataAccessor());
- Future<HelixTaskResult> future = _taskMap.get(taskId);
- // If the thread is still running it will be interrupted if cancel(true)
- // is called. So state transition callbacks should implement logic to
- // return
- // if it is interrupted.
- if (future.cancel(true))
- {
- _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled "
+ // If the thread is still running it will be interrupted if cancel(true)
+ // is called. So state transition callbacks should implement logic to
+ // return
+ // if it is interrupted.
+ if (future.cancel(true))
+ {
+ _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled task: "
+ taskId, notificationContext.getManager().getHelixDataAccessor());
- _taskMap.remove(taskId);
- }
- else
- {
- _statusUpdateUtil.logInfo(message,
+ _taskMap.remove(taskId);
+ return true;
+ }
+ else
+ {
+ _statusUpdateUtil.logInfo(message,
HelixTaskExecutor.class,
- "false when trying to cancel the message " + taskId,
+ "fail to cancel task: " + taskId,
notificationContext.getManager()
.getHelixDataAccessor());
+ }
}
- }
- else
- {
- _statusUpdateUtil.logWarning(message,
+ else
+ {
+ _statusUpdateUtil.logWarning(message,
HelixTaskExecutor.class,
- "Future not found when trying to cancel " + taskId,
+ "fail to cancel task: " + taskId + ", future not found",
notificationContext.getManager()
.getHelixDataAccessor());
+ }
}
- }
+ return false;
}
- protected void reportCompletion(Message message) // String msgId)
+ @Override
+ public void finishTask(MessageTask task)
{
+ Message message = task.getMessage();
+ String taskId = task.getTaskId();
+ LOG.info("message finished: " + taskId + ", took "
+ + (new Date().getTime() - message.getExecuteStartTimeStamp()));
+
synchronized (_lock)
{
- String taskId = message.getMsgId() + "/" + message.getPartitionName();
- LOG.info("message finished: " + taskId + ", took "
- + (new Date().getTime() - message.getExecuteStartTimeStamp()));
if (_taskMap.containsKey(taskId))
{
- _taskMap.remove(taskId);
+ MessageTaskInfo info = _taskMap.remove(taskId);
+ if (info._timerTask != null) {
+ // ok to cancel multiple times
+ info._timerTask.cancel();
+ }
}
else
{
- LOG.warn("message " + taskId + "not found in task map");
+ LOG.warn("message " + taskId + " not found in task map");
}
}
}
@@ -346,22 +441,18 @@ public class HelixTaskExecutor implements MessageListener
{
factory.reset();
}
- // Cancel all scheduled future
- // synchronized (_lock)
+ // Cancel all scheduled tasks
+ synchronized (_lock)
{
- for (Future<HelixTaskResult> f : _taskMap.values())
- {
- f.cancel(true);
- }
+ for (MessageTaskInfo info : _taskMap.values())
+ {
+ cancelTask(info._task);
+ }
_taskMap.clear();
}
return;
}
- HelixManager manager = changeContext.getManager();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
if (messages == null || messages.size() == 0)
{
LOG.info("No Messages to process");
@@ -371,6 +462,10 @@ public class HelixTaskExecutor implements MessageListener
// sort message by creation timestamp, so message created earlier is processed first
Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
+ HelixManager manager = changeContext.getManager();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
// message handlers created
List<MessageHandler> handlers = new ArrayList<MessageHandler>();
@@ -384,7 +479,6 @@ public class HelixTaskExecutor implements MessageListener
List<CurrentState> metaCurStates = new ArrayList<CurrentState>();
Set<String> createCurStateNames = new HashSet<String>();
- changeContext.add(NotificationContext.TASK_EXECUTOR_KEY, this);
for (Message message : messages)
{
// nop messages are simply removed. It is used to trigger onMessage() in
@@ -399,7 +493,7 @@ public class HelixTaskExecutor implements MessageListener
String tgtSessionId = message.getTgtSessionId();
- // if sessionId not match, remove it
+ // sessionId mismatch normally means message comes from expired session, just remove it
if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*"))
{
String warningMessage =
@@ -422,20 +516,21 @@ public class HelixTaskExecutor implements MessageListener
// read. Instead we keep it until the current state is updated.
// We will read the message again if there is a new message but we
// check for the status and ignore if its already read
- LOG.trace("Message already read. mid: " + message.getMsgId());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Message already read. msgId: " + message.getMsgId());
+ }
continue;
}
// create message handlers, if handlers not found, leave its state as NEW
try
{
- List<MessageHandler> createHandlers =
- createMessageHandlers(message, changeContext);
- if (createHandlers.isEmpty())
+ MessageHandler createHandler = createMessageHandler(message, changeContext);
+ if (createHandler == null)
{
continue;
}
- handlers.addAll(createHandlers);
+ handlers.add(createHandler);
}
catch (Exception e)
{
@@ -450,10 +545,10 @@ public class HelixTaskExecutor implements MessageListener
error,
accessor);
- // Mark message state UNPROCESSABLE if we hit an exception in creating
- // message handler. The message will stay on zookeeper but will not be processed
message.setMsgState(MessageState.UNPROCESSABLE);
- accessor.updateProperty(message.getKey(keyBuilder, instanceName), message);
+ accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+ LOG.error("Message cannot be proessed: " + message.getRecord(), e);
+
continue;
}
@@ -487,7 +582,7 @@ public class HelixTaskExecutor implements MessageListener
metaCurState.setBucketSize(message.getBucketSize());
metaCurState.setStateModelDefRef(message.getStateModelDef());
metaCurState.setSessionId(sessionId);
- metaCurState.setGroupMessageMode(message.getGroupMessageMode());
+ metaCurState.setBatchMessageMode(message.getBatchMessageMode());
String ftyName = message.getStateModelFactoryName();
if (ftyName != null)
{
@@ -512,7 +607,7 @@ public class HelixTaskExecutor implements MessageListener
}
catch (Exception e)
{
- LOG.error(e);
+ LOG.error("fail to create cur-state znodes for messages: " + readMsgs, e);
}
}
@@ -523,7 +618,8 @@ public class HelixTaskExecutor implements MessageListener
for (MessageHandler handler : handlers)
{
- scheduleTask(handler._message, handler, changeContext);
+ HelixTask task = new HelixTask(handler._message, changeContext, handler, this);
+ scheduleTask(task);
}
}
}
@@ -540,70 +636,35 @@ public class HelixTaskExecutor implements MessageListener
// the corresponding MessageHandlerFactory is registered
if (handlerFactory == null)
{
- LOG.warn("Fail to find message handler factory for type: " + msgType + " mid:"
+ LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: "
+ message.getMsgId());
return null;
}
+ // pass the executor to msg-handler since batch-msg-handler needs task-executor to schedule sub-msgs
+ changeContext.add(MapKey.TASK_EXECUTOR.toString(), this);
return handlerFactory.createHandler(message, changeContext);
}
- private List<MessageHandler> createMessageHandlers(Message message,
- NotificationContext changeContext)
- {
- List<MessageHandler> handlers = new ArrayList<MessageHandler>();
- if (!message.getGroupMessageMode())
- {
- LOG.info("Creating handler for message " + message.getMsgId() + "/"
- + message.getPartitionName());
-
- MessageHandler handler = createMessageHandler(message, changeContext);
-
- if (handler != null)
- {
- handlers.add(handler);
- }
- }
- else
- {
- _groupMsgHandler.put(message);
-
- List<String> partitionNames = message.getPartitionNames();
- for (String partitionName : partitionNames)
- {
- Message subMsg = new Message(message.getRecord());
- subMsg.setPartitionName(partitionName);
- subMsg.setAttribute(Attributes.PARENT_MSG_ID, message.getId());
-
- LOG.info("Creating handler for group message " + subMsg.getMsgId() + "/"
- + partitionName);
- MessageHandler handler = createMessageHandler(subMsg, changeContext);
- if (handler != null)
- {
- handlers.add(handler);
- }
- }
- }
-
- return handlers;
- }
-
- public void shutDown()
+ @Override
+ public void shutdown()
{
LOG.info("shutting down TaskExecutor");
+ _timer.cancel();
+
synchronized (_lock)
{
- for (String msgType : _threadpoolMap.keySet())
+ for (String msgType : _executorMap.keySet())
{
- List<Runnable> tasksLeft = _threadpoolMap.get(msgType).shutdownNow();
+ List<Runnable> tasksLeft = _executorMap.get(msgType).shutdownNow();
LOG.info(tasksLeft.size() + " tasks are still in the threadpool for msgType "
+ msgType);
}
- for (String msgType : _threadpoolMap.keySet())
+ for (String msgType : _executorMap.keySet())
{
try
{
- if (!_threadpoolMap.get(msgType).awaitTermination(200, TimeUnit.MILLISECONDS))
+ if (!_executorMap.get(msgType).awaitTermination(200, TimeUnit.MILLISECONDS))
{
LOG.warn(msgType + " is not fully termimated in 200 MS");
System.out.println(msgType + " is not fully termimated in 200 MS");
@@ -618,25 +679,4 @@ public class HelixTaskExecutor implements MessageListener
_monitor.shutDown();
LOG.info("shutdown finished");
}
-
- // TODO: remove this
- public static void main(String[] args) throws Exception
- {
- ExecutorService pool = Executors.newFixedThreadPool(DEFAULT_PARALLEL_TASKS);
- Future<HelixTaskResult> future;
- future = pool.submit(new Callable<HelixTaskResult>()
- {
-
- @Override
- public HelixTaskResult call() throws Exception
- {
- System.out.println("CMTaskExecutor.main(...).new Callable() {...}.call()");
- return null;
- }
-
- });
- future = pool.submit(new HelixTask(null, null, null, null));
- Thread.currentThread().join();
- System.out.println(future.isDone());
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java
new file mode 100644
index 0000000..083d903
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTask.java
@@ -0,0 +1,36 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Callable;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+
+public interface MessageTask extends Callable<HelixTaskResult> {
+ String getTaskId();
+
+ Message getMessage();
+
+ NotificationContext getNotificationContext();
+
+ void onTimeout();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java
new file mode 100644
index 0000000..652ca21
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTaskInfo.java
@@ -0,0 +1,41 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.TimerTask;
+import java.util.concurrent.Future;
+
+public class MessageTaskInfo {
+ final MessageTask _task;
+ final Future<HelixTaskResult> _future;
+ final TimerTask _timerTask;
+
+ public MessageTaskInfo(MessageTask task, Future<HelixTaskResult> future, TimerTask timerTask)
+ {
+ _task = task;
+ _future = future;
+ _timerTask = timerTask;
+ }
+
+ public Future<HelixTaskResult> getFuture() {
+ return _future;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java
new file mode 100644
index 0000000..c89a7be
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageTimeoutTask.java
@@ -0,0 +1,50 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.TimerTask;
+
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+public class MessageTimeoutTask extends TimerTask {
+ private static Logger LOG = Logger.getLogger(MessageTimeoutTask.class);
+
+ final HelixTaskExecutor _executor;
+ final MessageTask _task;
+
+ public MessageTimeoutTask(HelixTaskExecutor executor, MessageTask task)
+ {
+ _executor = executor;
+ _task = task;
+ }
+
+ @Override
+ public void run() {
+ Message message = _task.getMessage();
+ // NotificationContext context = _task.getNotificationContext();
+ // System.out.println("msg: " + message.getMsgId() + " timeouot.");
+ LOG.warn("Message time out, canceling. id:" + message.getMsgId() + " timeout : "
+ + message.getExecutionTimeout());
+ _task.onTimeout();
+ _executor.cancelTask(_task);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
new file mode 100644
index 0000000..b038e32
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
@@ -0,0 +1,92 @@
+package org.apache.helix.messaging.handling;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public interface TaskExecutor {
+ public static final int DEFAULT_PARALLEL_TASKS = 40;
+
+ /**
+ * register message handler factory this executor can handle
+ *
+ * @param type
+ * @param factory
+ */
+ public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory);
+
+ /**
+ * register message handler factory this executor can handle with specified
+ * thread-pool size
+ *
+ * @param type
+ * @param factory
+ * @param threadpoolSize
+ */
+ public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
+ int threadPoolSize);
+
+ /**
+ * schedule a message execution
+ *
+ * @param message
+ * @param handler
+ * @param context
+ */
+ public boolean scheduleTask(MessageTask task);
+
+ /**
+ * blocking on scheduling all tasks
+ *
+ * @param tasks
+ */
+ public List<Future<HelixTaskResult>> invokeAllTasks(List<MessageTask> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException;
+
+ /**
+ * cancel a message execution
+ *
+ * @param message
+ * @param context
+ */
+ public boolean cancelTask(MessageTask task);
+
+ /**
+ * cancel the timeout for the given task
+ *
+ * @param task
+ * @return
+ */
+ public boolean cancelTimeoutTask(MessageTask task);
+
+ /**
+ * finish a message execution
+ *
+ * @param message
+ */
+ public void finishTask(MessageTask task);
+
+ /**
+ * shutdown executor
+ */
+ public void shutdown();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 90cb95a..8de9a53 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -29,10 +29,13 @@ import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.messaging.handling.BatchMessageHandler;
import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.TaskExecutor;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
@@ -49,12 +52,20 @@ public class HelixStateMachineEngine implements StateMachineEngine
// StateModelName->FactoryName->StateModelFactory
private final Map<String, Map<String, StateModelFactory<? extends StateModel>>> _stateModelFactoryMap;
- StateModelParser _stateModelParser;
-
+ private final StateModelParser _stateModelParser;
private final HelixManager _manager;
-
private final ConcurrentHashMap<String, StateModelDefinition> _stateModelDefs;
+ public HelixStateMachineEngine(HelixManager manager)
+ {
+ _stateModelParser = new StateModelParser();
+ _manager = manager;
+
+ _stateModelFactoryMap =
+ new ConcurrentHashMap<String, Map<String, StateModelFactory<? extends StateModel>>>();
+ _stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>();
+ }
+
public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName)
{
return getStateModelFactory(stateModelName,
@@ -71,16 +82,6 @@ public class HelixStateMachineEngine implements StateMachineEngine
return _stateModelFactoryMap.get(stateModelName).get(factoryName);
}
- public HelixStateMachineEngine(HelixManager manager)
- {
- _stateModelParser = new StateModelParser();
- _manager = manager;
-
- _stateModelFactoryMap =
- new ConcurrentHashMap<String, Map<String, StateModelFactory<? extends StateModel>>>();
- _stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>();
- }
-
@Override
public boolean registerStateModelFactory(String stateModelDef,
StateModelFactory<? extends StateModel> factory)
@@ -191,8 +192,8 @@ public class HelixStateMachineEngine implements StateMachineEngine
if (!type.equals(MessageType.STATE_TRANSITION.toString()))
{
- throw new HelixException("Unexpected msg type for message " + message.getMsgId()
- + " type:" + message.getMsgType());
+ throw new HelixException("Expect state-transition message type, but was "
+ + message.getMsgType() + ", msgId: " + message.getMsgId());
}
String partitionKey = message.getPartitionName();
@@ -203,7 +204,7 @@ public class HelixStateMachineEngine implements StateMachineEngine
if (stateModelName == null)
{
- logger.error("message does not contain stateModelDef");
+ logger.error("Fail to create msg-handler because message does not contain stateModelDef. msgId: " + message.getId());
return null;
}
@@ -213,12 +214,12 @@ public class HelixStateMachineEngine implements StateMachineEngine
factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
}
- StateModelFactory stateModelFactory =
+ StateModelFactory<? extends StateModel> stateModelFactory =
getStateModelFactory(stateModelName, factoryName);
if (stateModelFactory == null)
{
- logger.warn("Cannot find stateModelFactory for model:" + stateModelName
- + " using factoryName:" + factoryName + " for resourceGroup:" + resourceName);
+ logger.warn("Fail to create msg-handler because cannot find stateModelFactory for model: " + stateModelName
+ + " using factoryName: " + factoryName + " for resource: " + resourceName);
return null;
}
@@ -231,39 +232,48 @@ public class HelixStateMachineEngine implements StateMachineEngine
accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
if (stateModelDef == null)
{
- throw new HelixException("stateModelDef for " + stateModelName
- + " does NOT exists");
+ throw new HelixException("fail to create msg-handler because stateModelDef for " + stateModelName
+ + " does NOT exist");
}
_stateModelDefs.put(stateModelName, stateModelDef);
}
- // create currentStateDelta for this partition
- String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
- StateModel stateModel = stateModelFactory.getStateModel(partitionKey);
- if (stateModel == null)
- {
- stateModelFactory.createAndAddStateModel(partitionKey);
- stateModel = stateModelFactory.getStateModel(partitionKey);
- stateModel.updateState(initState);
- }
+ if (message.getBatchMessageMode() == false) {
+ // create currentStateDelta for this partition
+ String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
+ StateModel stateModel = stateModelFactory.getStateModel(partitionKey);
+ if (stateModel == null)
+ {
+ stateModel = stateModelFactory.createAndAddStateModel(partitionKey);
+ stateModel.updateState(initState);
+ }
- CurrentState currentStateDelta = new CurrentState(resourceName);
- currentStateDelta.setSessionId(sessionId);
- currentStateDelta.setStateModelDefRef(stateModelName);
- currentStateDelta.setStateModelFactoryName(factoryName);
- currentStateDelta.setBucketSize(bucketSize);
-
- currentStateDelta.setState(partitionKey, (stateModel.getCurrentState() == null)
- ? initState : stateModel.getCurrentState());
-
- HelixTaskExecutor executor = (HelixTaskExecutor) context.get(NotificationContext.TASK_EXECUTOR_KEY);
-
- return new HelixStateTransitionHandler(stateModel,
- message,
- context,
- currentStateDelta,
- executor);
- }
+ // TODO: move currentStateDelta to StateTransitionMsgHandler
+ CurrentState currentStateDelta = new CurrentState(resourceName);
+ currentStateDelta.setSessionId(sessionId);
+ currentStateDelta.setStateModelDefRef(stateModelName);
+ currentStateDelta.setStateModelFactoryName(factoryName);
+ currentStateDelta.setBucketSize(bucketSize);
+
+ currentStateDelta.setState(partitionKey, (stateModel.getCurrentState() == null)
+ ? initState : stateModel.getCurrentState());
+
+ return new HelixStateTransitionHandler(stateModel,
+ message,
+ context,
+ currentStateDelta);
+ } else
+ {
+ // get executor-service for the message
+ TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString());
+ if (executor == null)
+ {
+ logger.error("fail to get executor-service for batch message: " + message.getId()
+ + ". msgType: " + message.getMsgType() + ", resource: " + message.getResourceName());
+ return null;
+ }
+ return new BatchMessageHandler(message, context, this, executor);
+ } }
@Override
public String getMessageType()
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/a1bf1244/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
index 821127c..8207052 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
@@ -51,9 +51,11 @@ public abstract class StateModelFactory<T extends StateModel>
*
* @param partitionName
*/
- public void createAndAddStateModel(String partitionName)
+ public T createAndAddStateModel(String partitionName)
{
- _stateModelMap.put(partitionName, createNewStateModel(partitionName));
+ T stateModel = createNewStateModel(partitionName);
+ _stateModelMap.put(partitionName, stateModel);
+ return stateModel;
}
/**