You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 01:14:58 UTC
[16/42] Refactoring the package names and removing jsql parser
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/manager/zk/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/package-info.java b/helix-core/src/main/java/org/apache/helix/manager/zk/package-info.java
new file mode 100644
index 0000000..08516e8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * zookeeper-based implementation of Helix cluster manager
+ *
+ */
+package org.apache.helix.manager.zk;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java b/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java
new file mode 100644
index 0000000..5a520a9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/AsyncCallback.java
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+
+public abstract class AsyncCallback
+{
+
+ private static Logger _logger = Logger.getLogger(AsyncCallback.class);
+ long _startTimeStamp = 0;
+ protected long _timeout = -1;
+ Timer _timer = null;
+ List<Message> _messagesSent;
+ protected final List<Message> _messageReplied = new ArrayList<Message>();
+ boolean _timedOut = false;
+ boolean _isInterrupted = false;
+
+ /**
+ * Enforcing timeout to be set
+ *
+ * @param timeout
+ */
+ public AsyncCallback(long timeout)
+ {
+ _logger.info("Setting time out to " + timeout + " ms");
+ _timeout = timeout;
+ }
+
+ public AsyncCallback()
+ {
+ this(-1);
+ }
+
+ public final void setTimeout(long timeout)
+ {
+ _logger.info("Setting time out to " + timeout + " ms");
+ _timeout = timeout;
+
+ }
+
+ public List<Message> getMessageReplied()
+ {
+ return _messageReplied;
+ }
+
+ public boolean isInterrupted()
+ {
+ return _isInterrupted;
+ }
+
+ public void setInterrupted(boolean b)
+ {
+ _isInterrupted = true;
+ }
+
+ public synchronized final void onReply(Message message)
+ {
+ _logger.info("OnReply msg " + message.getMsgId());
+ if (!isDone())
+ {
+ _messageReplied.add(message);
+ try
+ {
+ onReplyMessage(message);
+ }
+ catch(Exception e)
+ {
+ _logger.error(e);
+ }
+ }
+ if (isDone())
+ {
+ if(_timer != null)
+ {
+ _timer.cancel();
+ }
+ notifyAll();
+ }
+ }
+
+ /**
+ * Default implementation will wait until every message sent gets a response
+ *
+ * @return
+ */
+ public boolean isDone()
+ {
+ return _messageReplied.size() == _messagesSent.size();
+ }
+
+ public boolean isTimedOut()
+ {
+ return _timedOut;
+ }
+
+ final void setMessagesSent(List<Message> generatedMessage)
+ {
+ _messagesSent = generatedMessage;
+ }
+
+ final void startTimer()
+ {
+ if (_timer == null && _timeout > 0)
+ {
+ if (_startTimeStamp == 0)
+ {
+ _startTimeStamp = new Date().getTime();
+ }
+ _timer = new Timer(true);
+ _timer.schedule(new TimeoutTask(this), _timeout);
+ }
+ }
+
+ public abstract void onTimeOut();
+
+ public abstract void onReplyMessage(Message message);
+
+ class TimeoutTask extends TimerTask
+ {
+ AsyncCallback _callback;
+
+ public TimeoutTask(AsyncCallback asyncCallback)
+ {
+ _callback = asyncCallback;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ synchronized (_callback)
+ {
+ _callback._timedOut = true;
+ _callback.notifyAll();
+ _callback.onTimeOut();
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error(e);
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
new file mode 100644
index 0000000..d4cc9b0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.Criteria;
+import org.apache.helix.DataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.Criteria.DataSource;
+import org.apache.helix.josql.ClusterJosqlQueryProcessor;
+import org.apache.helix.josql.ZNRecordRow;
+import org.apache.log4j.Logger;
+import org.josql.Query;
+import org.josql.QueryExecutionException;
+import org.josql.QueryParseException;
+import org.josql.QueryResults;
+
+
+public class CriteriaEvaluator
+{
+ private static Logger logger = Logger.getLogger(CriteriaEvaluator.class);
+
+ public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria, HelixManager manager)
+ {
+ List<Map<String, String>> selected = new ArrayList<Map<String, String>>();
+
+ String queryFields =
+ (!recipientCriteria.getInstanceName().equals("") ? " " + ZNRecordRow.MAP_SUBKEY : " ''") +","+
+ (!recipientCriteria.getResource().equals("") ? " " + ZNRecordRow.ZNRECORD_ID : " ''") +","+
+ (!recipientCriteria.getPartition().equals("") ? " " + ZNRecordRow.MAP_KEY : " ''") +","+
+ (!recipientCriteria.getPartitionState().equals("") ? " " + ZNRecordRow.MAP_VALUE : " '' ");
+
+ String matchCondition =
+ ZNRecordRow.MAP_SUBKEY + " LIKE '" + (!recipientCriteria.getInstanceName().equals("") ? (recipientCriteria.getInstanceName() +"'") : "%' ") + " AND "+
+ ZNRecordRow.ZNRECORD_ID+ " LIKE '" + (!recipientCriteria.getResource().equals("") ? (recipientCriteria.getResource() +"'") : "%' ") + " AND "+
+ ZNRecordRow.MAP_KEY + " LIKE '" + (!recipientCriteria.getPartition().equals("") ? (recipientCriteria.getPartition() +"'") : "%' ") + " AND "+
+ ZNRecordRow.MAP_VALUE + " LIKE '" + (!recipientCriteria.getPartitionState().equals("") ? (recipientCriteria.getPartitionState()+"'") : "%' ") + " AND "+
+ ZNRecordRow.MAP_SUBKEY + " IN ((SELECT [*]id FROM :LIVEINSTANCES))";
+
+
+ String queryTarget = recipientCriteria.getDataSource().toString() + ClusterJosqlQueryProcessor.FLATTABLE;
+
+ String josql = "SELECT DISTINCT " + queryFields
+ + " FROM " + queryTarget + " WHERE "
+ + matchCondition;
+ ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
+ List<Object> result = new ArrayList<Object>();
+ try
+ {
+ logger.info("JOSQL query: " + josql);
+ result = p.runJoSqlQuery(josql, null, null);
+ }
+ catch (Exception e)
+ {
+ logger.error("", e);
+ return selected;
+ }
+
+ for(Object o : result)
+ {
+ Map<String, String> resultRow = new HashMap<String, String>();
+ List<Object> row = (List<Object>)o;
+ resultRow.put("instanceName", (String)(row.get(0)));
+ resultRow.put("resourceName", (String)(row.get(1)));
+ resultRow.put("partitionName", (String)(row.get(2)));
+ resultRow.put("partitionState", (String)(row.get(3)));
+ selected.add(resultRow);
+ }
+ logger.info("JOSQL query return " + selected.size() + " rows");
+ return selected;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
new file mode 100644
index 0000000..b589874
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -0,0 +1,392 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.messaging.handling.AsyncCallbackService;
+import org.apache.helix.messaging.handling.HelixTaskExecutor;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+
+public class DefaultMessagingService implements ClusterMessagingService
+{
+ private final HelixManager _manager;
+ private final CriteriaEvaluator _evaluator;
+ private final HelixTaskExecutor _taskExecutor;
+ // TODO:rename to factory, this is not a service
+ private final AsyncCallbackService _asyncCallbackService;
+ private static Logger _logger =
+ Logger.getLogger(DefaultMessagingService.class);
+ ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded
+ = new ConcurrentHashMap<String, MessageHandlerFactory>();
+
+ public DefaultMessagingService(HelixManager manager)
+ {
+ _manager = manager;
+ _evaluator = new CriteriaEvaluator();
+ _taskExecutor = new HelixTaskExecutor();
+ _asyncCallbackService = new AsyncCallbackService();
+ _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
+ _asyncCallbackService);
+ }
+
+ @Override
+ public int send(Criteria recipientCriteria, final Message messageTemplate)
+ {
+ return send(recipientCriteria, messageTemplate, null, -1);
+ }
+
+ @Override
+ public int send(final Criteria recipientCriteria,
+ final Message message,
+ AsyncCallback callbackOnReply,
+ int timeOut)
+ {
+ return send(recipientCriteria, message, callbackOnReply, timeOut, 0);
+ }
+
+ @Override
+ public int send(final Criteria recipientCriteria,
+ final Message message,
+ AsyncCallback callbackOnReply,
+ int timeOut,
+ int retryCount)
+ {
+ Map<InstanceType, List<Message>> generateMessage =
+ generateMessage(recipientCriteria, message);
+ int totalMessageCount = 0;
+ for (List<Message> messages : generateMessage.values())
+ {
+ totalMessageCount += messages.size();
+ }
+ _logger.info("Send " + totalMessageCount + " messages with criteria "
+ + recipientCriteria);
+ if (totalMessageCount == 0)
+ {
+ return 0;
+ }
+ String correlationId = null;
+ if (callbackOnReply != null)
+ {
+ int totalTimeout = timeOut * (retryCount + 1);
+ if (totalTimeout < 0)
+ {
+ totalTimeout = -1;
+ }
+ callbackOnReply.setTimeout(totalTimeout);
+ correlationId = UUID.randomUUID().toString();
+ for (List<Message> messages : generateMessage.values())
+ {
+ callbackOnReply.setMessagesSent(messages);
+ }
+ _asyncCallbackService.registerAsyncCallback(correlationId, callbackOnReply);
+ }
+
+ for (InstanceType receiverType : generateMessage.keySet())
+ {
+ List<Message> list = generateMessage.get(receiverType);
+ for (Message tempMessage : list)
+ {
+ tempMessage.setRetryCount(retryCount);
+ tempMessage.setExecutionTimeout(timeOut);
+ tempMessage.setSrcInstanceType(_manager.getInstanceType());
+ if (correlationId != null)
+ {
+ tempMessage.setCorrelationId(correlationId);
+ }
+
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ if (receiverType == InstanceType.CONTROLLER)
+ {
+ // _manager.getDataAccessor().setProperty(PropertyType.MESSAGES_CONTROLLER,
+ // tempMessage,
+ // tempMessage.getId());
+ accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()),
+ tempMessage);
+ }
+
+ if (receiverType == InstanceType.PARTICIPANT)
+ {
+ accessor.setProperty(keyBuilder.message(tempMessage.getTgtName(),
+ tempMessage.getId()),
+ tempMessage);
+ }
+ }
+ }
+
+ if (callbackOnReply != null)
+ {
+ // start timer if timeout is set
+ callbackOnReply.startTimer();
+ }
+ return totalMessageCount;
+ }
+
+ private Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
+ final Message message)
+ {
+ Map<InstanceType, List<Message>> messagesToSendMap =
+ new HashMap<InstanceType, List<Message>>();
+ InstanceType instanceType = recipientCriteria.getRecipientInstanceType();
+
+ if (instanceType == InstanceType.CONTROLLER)
+ {
+ List<Message> messages = generateMessagesForController(message);
+ messagesToSendMap.put(InstanceType.CONTROLLER, messages);
+ // _dataAccessor.setControllerProperty(PropertyType.MESSAGES,
+ // newMessage.getRecord(), CreateMode.PERSISTENT);
+ }
+ else if (instanceType == InstanceType.PARTICIPANT)
+ {
+ List<Message> messages = new ArrayList<Message>();
+ List<Map<String, String>> matchedList =
+ _evaluator.evaluateCriteria(recipientCriteria, _manager);
+
+ if (!matchedList.isEmpty())
+ {
+ Map<String, String> sessionIdMap = new HashMap<String, String>();
+ if (recipientCriteria.isSessionSpecific())
+ {
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ List<LiveInstance> liveInstances =
+ accessor.getChildValues(keyBuilder.liveInstances());
+
+ for (LiveInstance liveInstance : liveInstances)
+ {
+ sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
+ }
+ }
+ for (Map<String, String> map : matchedList)
+ {
+ String id = UUID.randomUUID().toString();
+ Message newMessage = new Message(message.getRecord(), id);
+ String srcInstanceName = _manager.getInstanceName();
+ String tgtInstanceName = map.get("instanceName");
+ // Don't send message to self
+ if (recipientCriteria.isSelfExcluded()
+ && srcInstanceName.equalsIgnoreCase(tgtInstanceName))
+ {
+ continue;
+ }
+ newMessage.setSrcName(srcInstanceName);
+ newMessage.setTgtName(tgtInstanceName);
+ newMessage.setResourceName(map.get("resourceName"));
+ newMessage.setPartitionName(map.get("partitionName"));
+ if (recipientCriteria.isSessionSpecific())
+ {
+ newMessage.setTgtSessionId(sessionIdMap.get(tgtInstanceName));
+ }
+ messages.add(newMessage);
+ }
+ messagesToSendMap.put(InstanceType.PARTICIPANT, messages);
+ }
+ }
+ return messagesToSendMap;
+ }
+
+ private List<Message> generateMessagesForController(Message message)
+ {
+ List<Message> messages = new ArrayList<Message>();
+ String id = UUID.randomUUID().toString();
+ Message newMessage = new Message(message.getRecord(), id);
+ newMessage.setMsgId(id);
+ newMessage.setSrcName(_manager.getInstanceName());
+ newMessage.setTgtName("Controller");
+ messages.add(newMessage);
+ return messages;
+ }
+
+ @Override
+ public synchronized void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
+ {
+ if (_manager.isConnected())
+ {
+ registerMessageHandlerFactoryInternal(type, factory);
+ }
+ else
+ {
+ _messageHandlerFactoriestobeAdded.put(type, factory);
+ }
+ }
+
+ public synchronized void onConnected()
+ {
+ for(String type : _messageHandlerFactoriestobeAdded.keySet())
+ {
+ registerMessageHandlerFactoryInternal(type, _messageHandlerFactoriestobeAdded.get(type));
+ }
+ _messageHandlerFactoriestobeAdded.clear();
+ }
+
+ void registerMessageHandlerFactoryInternal(String type, MessageHandlerFactory factory)
+ {
+ _logger.info("registering msg factory for type " + type);
+ int threadpoolSize = HelixTaskExecutor.DEFAULT_PARALLEL_TASKS;
+ String threadpoolSizeStr = null;
+ String key = type + "." + HelixTaskExecutor.MAX_THREADS;
+
+ ConfigAccessor configAccessor = _manager.getConfigAccessor();
+ if(configAccessor != null)
+ {
+ ConfigScope scope = null;
+
+ // Read the participant config and cluster config for the per-message type thread pool size.
+ // participant config will override the cluster config.
+
+ if(_manager.getInstanceType() == InstanceType.PARTICIPANT || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
+ {
+ scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).forParticipant(_manager.getInstanceName()).build();
+ threadpoolSizeStr = configAccessor.get(scope, key);
+ }
+
+ if(threadpoolSizeStr == null)
+ {
+ scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
+ threadpoolSizeStr = configAccessor.get(scope, key);
+ }
+ }
+
+ if(threadpoolSizeStr != null)
+ {
+ try
+ {
+ threadpoolSize = Integer.parseInt(threadpoolSizeStr);
+ if(threadpoolSize <= 0)
+ {
+ threadpoolSize = 1;
+ }
+ }
+ catch(Exception e)
+ {
+ _logger.error("", e);
+ }
+ }
+
+ _taskExecutor.registerMessageHandlerFactory(type, factory, threadpoolSize);
+ // Self-send a no-op message, so that the onMessage() call will be invoked
+ // again, and
+ // we have a chance to process the message that we received with the new
+ // added MessageHandlerFactory
+ // before the factory is added.
+ sendNopMessage();
+ }
+
+ public void sendNopMessage()
+ {
+ if (_manager.isConnected())
+ {
+ try
+ {
+ Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
+ nopMsg.setSrcName(_manager.getInstanceName());
+
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ if (_manager.getInstanceType() == InstanceType.CONTROLLER
+ || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
+ {
+ nopMsg.setTgtName("Controller");
+ accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
+ }
+
+ if (_manager.getInstanceType() == InstanceType.PARTICIPANT
+ || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
+ {
+ nopMsg.setTgtName(_manager.getInstanceName());
+ accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()),
+ nopMsg);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error(e);
+ }
+ }
+ }
+
+ public HelixTaskExecutor getExecutor()
+ {
+ return _taskExecutor;
+ }
+
+ @Override
+ public int sendAndWait(Criteria receipientCriteria,
+ Message message,
+ AsyncCallback asyncCallback,
+ int timeOut,
+ int retryCount)
+ {
+ int messagesSent =
+ send(receipientCriteria, message, asyncCallback, timeOut, retryCount);
+ if (messagesSent > 0)
+ {
+ while (!asyncCallback.isDone() && !asyncCallback.isTimedOut())
+ {
+ synchronized (asyncCallback)
+ {
+ try
+ {
+ asyncCallback.wait();
+ }
+ catch (InterruptedException e)
+ {
+ _logger.error(e);
+ asyncCallback.setInterrupted(true);
+ break;
+ }
+ }
+ }
+ }
+ else
+ {
+ _logger.warn("No messages sent. For Criteria:" + receipientCriteria);
+ }
+ return messagesSent;
+ }
+
+ @Override
+ public int sendAndWait(Criteria recipientCriteria,
+ Message message,
+ AsyncCallback asyncCallback,
+ int timeOut)
+ {
+ return sendAndWait(recipientCriteria, message, asyncCallback, timeOut, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
new file mode 100644
index 0000000..6ad31d9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
@@ -0,0 +1,149 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.Map;
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.messaging.AsyncCallback;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.log4j.Logger;
+
+
+public class AsyncCallbackService implements MessageHandlerFactory
+{
+ private final ConcurrentHashMap<String, AsyncCallback> _callbackMap = new ConcurrentHashMap<String, AsyncCallback>();
+ private static Logger _logger = Logger.getLogger(AsyncCallbackService.class);
+
+ public AsyncCallbackService()
+ {
+ }
+
+ public void registerAsyncCallback(String correlationId, AsyncCallback callback)
+ {
+ if (_callbackMap.containsKey(correlationId))
+ {
+ _logger.warn("correlation id " + correlationId + " already registered");
+ }
+ _logger.info("registering correlation id " + correlationId);
+ _callbackMap.put(correlationId, callback);
+ }
+
+ void verifyMessage(Message message)
+ {
+ if (!message.getMsgType().toString()
+ .equalsIgnoreCase(MessageType.TASK_REPLY.toString()))
+ {
+ String errorMsg = "Unexpected msg type for message " + message.getMsgId()
+ + " type:" + message.getMsgType() + " Expected : "
+ + MessageType.TASK_REPLY;
+ _logger.error(errorMsg);
+ throw new HelixException(errorMsg);
+ }
+ String correlationId = message.getCorrelationId();
+ if (correlationId == null)
+ {
+ String errorMsg = "Message " + message.getMsgId()
+ + " does not have correlation id";
+ _logger.error(errorMsg);
+ throw new HelixException(errorMsg);
+ }
+
+ if (!_callbackMap.containsKey(correlationId))
+ {
+ String errorMsg = "Message "
+ + message.getMsgId()
+ + " does not have correponding callback. Probably timed out already. Correlation id: "
+ + correlationId;
+ _logger.error(errorMsg);
+ throw new HelixException(errorMsg);
+ }
+ _logger.info("Verified reply message " + message.getMsgId()
+ + " correlation:" + correlationId);
+ }
+
+ @Override
+ public MessageHandler createHandler(Message message,
+ NotificationContext context)
+ {
+ verifyMessage(message);
+ return new AsyncCallbackMessageHandler(message.getCorrelationId(),message, context);
+ }
+
+ @Override
+ public String getMessageType()
+ {
+ return MessageType.TASK_REPLY.toString();
+ }
+
+ @Override
+ public void reset()
+ {
+
+ }
+
+ public class AsyncCallbackMessageHandler extends MessageHandler
+ {
+ private final String _correlationId;
+
+ public AsyncCallbackMessageHandler(String correlationId, Message message, NotificationContext context)
+ {
+ super(message, context);
+ _correlationId = correlationId;
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException
+ {
+ verifyMessage(_message);
+ HelixTaskResult result = new HelixTaskResult();
+ assert (_correlationId.equalsIgnoreCase(_message.getCorrelationId()));
+ _logger.info("invoking reply message " + _message.getMsgId()
+ + ", correlationid:" + _correlationId);
+
+ AsyncCallback callback = _callbackMap.get(_correlationId);
+ synchronized (callback)
+ {
+ callback.onReply(_message);
+ if (callback.isDone())
+ {
+ _logger.info("Removing finished callback, correlationid:"
+ + _correlationId);
+ _callbackMap.remove(_correlationId);
+ }
+ }
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type)
+ {
+ _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java
new file mode 100644
index 0000000..a6ee6f0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/GroupMessageHandler.java
@@ -0,0 +1,116 @@
+package org.apache.helix.messaging.handling;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
+
+
+public class GroupMessageHandler
+{
+ class CurrentStateUpdate
+ {
+ final PropertyKey _key;
+ final CurrentState _curStateDelta;
+
+ public CurrentStateUpdate(PropertyKey key, CurrentState curStateDelta)
+ {
+ _key = key;
+ _curStateDelta = curStateDelta;
+ }
+
+ public void merge(CurrentState curState)
+ {
+ _curStateDelta.getRecord().merge(curState.getRecord());
+ }
+ }
+
+ static class GroupMessageInfo
+ {
+ final Message _message;
+ final AtomicInteger _countDown;
+ final ConcurrentLinkedQueue<CurrentStateUpdate> _curStateUpdateList;
+
+ public GroupMessageInfo(Message message)
+ {
+ _message = message;
+ List<String> partitionNames = message.getPartitionNames();
+ _countDown = new AtomicInteger(partitionNames.size());
+ _curStateUpdateList = new ConcurrentLinkedQueue<CurrentStateUpdate>();
+ }
+
+ public Map<PropertyKey, CurrentState> merge()
+ {
+ Map<String, CurrentStateUpdate> curStateUpdateMap =
+ new HashMap<String, CurrentStateUpdate>();
+ for (CurrentStateUpdate update : _curStateUpdateList)
+ {
+ String path = update._key.getPath();
+ if (!curStateUpdateMap.containsKey(path))
+ {
+ curStateUpdateMap.put(path, update);
+ }
+ else
+ {
+ curStateUpdateMap.get(path).merge(update._curStateDelta);
+ }
+ }
+
+ Map<PropertyKey, CurrentState> ret = new HashMap<PropertyKey, CurrentState>();
+ for (CurrentStateUpdate update : curStateUpdateMap.values())
+ {
+ ret.put(update._key, update._curStateDelta);
+ }
+
+ return ret;
+ }
+
+ }
+
+ final ConcurrentHashMap<String, GroupMessageInfo> _groupMsgMap;
+
+ public GroupMessageHandler()
+ {
+ _groupMsgMap = new ConcurrentHashMap<String, GroupMessageInfo>();
+ }
+
+ public void put(Message message)
+ {
+ _groupMsgMap.putIfAbsent(message.getId(), new GroupMessageInfo(message));
+ }
+
+ // return non-null if all sub-messages are completed
+ public GroupMessageInfo onCompleteSubMessage(Message subMessage)
+ {
+ String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
+ GroupMessageInfo info = _groupMsgMap.get(parentMid);
+ if (info != null)
+ {
+ int val = info._countDown.decrementAndGet();
+ if (val <= 0)
+ {
+ return _groupMsgMap.remove(parentMid);
+ }
+ }
+
+ return null;
+ }
+
+ void addCurStateUpdate(Message subMessage, PropertyKey key, CurrentState delta)
+ {
+ String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
+ GroupMessageInfo info = _groupMsgMap.get(parentMid);
+ if (info != null)
+ {
+ info._curStateUpdateList.add(new CurrentStateUpdate(key, delta));
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
new file mode 100644
index 0000000..102a984
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -0,0 +1,388 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecordBucketizer;
+import org.apache.helix.ZNRecordDelta;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecordDelta.MergeOperation;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelParser;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.util.StatusUpdateUtil;
+import org.apache.log4j.Logger;
+
+
+public class HelixStateTransitionHandler extends MessageHandler
+{
+ public static class HelixStateMismatchException extends Exception
+ {
+ public HelixStateMismatchException(String info)
+ {
+ super(info);
+ }
+ }
+ private static Logger logger =
+ Logger.getLogger(HelixStateTransitionHandler.class);
+ private final StateModel _stateModel;
+ StatusUpdateUtil _statusUpdateUtil;
+ private final StateModelParser _transitionMethodFinder;
+ private final CurrentState _currentStateDelta;
+ volatile boolean _isTimeout = false;
+ private final HelixTaskExecutor _executor;
+
+ public HelixStateTransitionHandler(StateModel stateModel,
+ Message message,
+ NotificationContext context,
+ CurrentState currentStateDelta,
+ HelixTaskExecutor executor)
+ {
+ super(message, context);
+ _stateModel = stateModel;
+ _statusUpdateUtil = new StatusUpdateUtil();
+ _transitionMethodFinder = new StateModelParser();
+ _currentStateDelta = currentStateDelta;
+ _executor = executor;
+ }
+
+ private void prepareMessageExecution(HelixManager manager, Message message) throws HelixException,
+ HelixStateMismatchException
+ {
+ if (!message.isValid())
+ {
+ String errorMessage =
+ "Invalid Message, ensure that message: " + message
+ + " has all the required fields: "
+ + Arrays.toString(Message.Attributes.values());
+
+ _statusUpdateUtil.logError(message,
+ HelixStateTransitionHandler.class,
+ errorMessage,
+ manager.getHelixDataAccessor());
+ logger.error(errorMessage);
+ throw new HelixException(errorMessage);
+ }
+ // DataAccessor accessor = manager.getDataAccessor();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+
+ String partitionName = message.getPartitionName();
+ String fromState = message.getFromState();
+
+ // Verify the fromState and current state of the stateModel
+ String state = _currentStateDelta.getState(partitionName);
+
+ if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state))
+ {
+ String errorMessage =
+ "Current state of stateModel does not match the fromState in Message"
+ + ", Current State:" + state + ", message expected:" + fromState
+ + ", partition: " + partitionName + ", from: " + message.getMsgSrc()
+ + ", to: " + message.getTgtName();
+
+ _statusUpdateUtil.logError(message,
+ HelixStateTransitionHandler.class,
+ errorMessage,
+ accessor);
+ logger.error(errorMessage);
+ throw new HelixStateMismatchException(errorMessage);
+ }
+ }
+
+ void postExecutionMessage(HelixManager manager,
+ Message message,
+ NotificationContext context,
+ HelixTaskResult taskResult,
+ Exception exception)
+ {
+ String partitionKey = message.getPartitionName();
+ String resource = message.getResourceName();
+ String sessionId = message.getTgtSessionId();
+ String instanceName = manager.getInstanceName();
+
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ int bucketSize = message.getBucketSize();
+ ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
+
+ // Lock the helix manager so that the session id will not change when we update
+ // the state model state. for zk current state it is OK as we have the per-session
+ // current state node
+ synchronized (manager)
+ {
+ if (!message.getTgtSessionId().equals(manager.getSessionId()))
+ {
+ logger.warn("Session id has changed. Skip postExecutionMessage. Old session "
+ + message.getExecutionSessionId() + " , new session : "
+ + manager.getSessionId());
+ return;
+ }
+
+ if (taskResult.isSucess())
+ {
+ // String fromState = message.getFromState();
+ String toState = message.getToState();
+ _currentStateDelta.setState(partitionKey, toState);
+
+ if (toState.equalsIgnoreCase("DROPPED"))
+ {
+ // for "OnOfflineToDROPPED" message, we need to remove the resource key record
+ // from the current state of the instance because the resource key is dropped.
+ // In the state model it will be stayed as "OFFLINE", which is OK.
+ ZNRecordDelta delta =
+ new ZNRecordDelta(_currentStateDelta.getRecord(), MergeOperation.SUBTRACT);
+ // Don't subtract simple fields since they contain stateModelDefRef
+ delta._record.getSimpleFields().clear();
+
+ List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
+ deltaList.add(delta);
+ _currentStateDelta.setDeltaList(deltaList);
+ }
+ else
+ {
+ // if the partition is not to be dropped, update _stateModel to the TO_STATE
+ _stateModel.updateState(toState);
+ }
+ }
+ else
+ {
+ if (exception instanceof HelixStateMismatchException)
+ {
+ // if fromState mismatch, set current state on zk to stateModel's current state
+ logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. partitionKey: "
+ + partitionKey
+ + ", currentState: "
+ + _stateModel.getCurrentState()
+ + ", message: " + message);
+ _currentStateDelta.setState(partitionKey, _stateModel.getCurrentState());
+ }
+ else
+ {
+ StateTransitionError error =
+ new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, exception);
+ if (exception instanceof InterruptedException)
+ {
+ if (_isTimeout)
+ {
+ error =
+ new StateTransitionError(ErrorType.INTERNAL,
+ ErrorCode.TIMEOUT,
+ exception);
+ }
+ else
+ {
+ // State transition interrupted but not caused by timeout. Keep the current
+ // state in this case
+ logger.error("State transition interrupted but not timeout. Not updating state. Partition : "
+ + message.getPartitionName() + " MsgId : " + message.getMsgId());
+ return;
+ }
+ }
+ _stateModel.rollbackOnError(message, context, error);
+ _currentStateDelta.setState(partitionKey, "ERROR");
+ _stateModel.updateState("ERROR");
+ }
+ }
+ }
+ try
+ {
+ // Update the ZK current state of the node
+ PropertyKey key = keyBuilder.currentState(instanceName,
+ sessionId,
+ resource,
+ bucketizer.getBucketName(partitionKey));
+ if (!_message.getGroupMessageMode())
+ {
+ accessor.updateProperty(key, _currentStateDelta);
+ }
+ else
+ {
+ _executor._groupMsgHandler.addCurStateUpdate(_message, key, _currentStateDelta);
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error("Error when updating the state ", e);
+ StateTransitionError error =
+ new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
+ _stateModel.rollbackOnError(message, context, error);
+ _statusUpdateUtil.logError(message,
+ HelixStateTransitionHandler.class,
+ e,
+ "Error when update the state ",
+ accessor);
+ }
+ }
+
+ public HelixTaskResult handleMessageInternal(Message message,
+ NotificationContext context)
+ {
+ synchronized (_stateModel)
+ {
+ HelixTaskResult taskResult = new HelixTaskResult();
+ HelixManager manager = context.getManager();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+
+ _statusUpdateUtil.logInfo(message,
+ HelixStateTransitionHandler.class,
+ "Message handling task begin execute",
+ accessor);
+ message.setExecuteStartTimeStamp(new Date().getTime());
+
+ Exception exception = null;
+ try
+ {
+ prepareMessageExecution(manager, message);
+ invoke(accessor, context, taskResult, message);
+ }
+ catch (HelixStateMismatchException e)
+ {
+ // Simply log error and return from here if State mismatch.
+ // The current state of the state model is intact.
+ taskResult.setSuccess(false);
+ taskResult.setMessage(e.toString());
+ taskResult.setException(e);
+ exception = e;
+ // return taskResult;
+ }
+ catch (Exception e)
+ {
+ String errorMessage =
+ "Exception while executing a state transition task "
+ + message.getPartitionName();
+ logger.error(errorMessage, e);
+ if (e.getCause() != null && e.getCause() instanceof InterruptedException)
+ {
+ e = (InterruptedException) e.getCause();
+ }
+ _statusUpdateUtil.logError(message,
+ HelixStateTransitionHandler.class,
+ e,
+ errorMessage,
+ accessor);
+ taskResult.setSuccess(false);
+ taskResult.setMessage(e.toString());
+ taskResult.setException(e);
+ taskResult.setInterrupted(e instanceof InterruptedException);
+ exception = e;
+ }
+ postExecutionMessage(manager, message, context, taskResult, exception);
+
+ return taskResult;
+ }
+ }
+
+ private void invoke(HelixDataAccessor accessor,
+ NotificationContext context,
+ HelixTaskResult taskResult,
+ Message message) throws IllegalAccessException,
+ InvocationTargetException,
+ InterruptedException
+ {
+ _statusUpdateUtil.logInfo(message,
+ HelixStateTransitionHandler.class,
+ "Message handling invoking",
+ accessor);
+
+ // by default, we invoke state transition function in state model
+ Method methodToInvoke = null;
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+ methodToInvoke =
+ _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(),
+ fromState,
+ toState,
+ new Class[] { Message.class,
+ NotificationContext.class });
+ if (methodToInvoke != null)
+ {
+ methodToInvoke.invoke(_stateModel, new Object[] { message, context });
+ taskResult.setSuccess(true);
+ }
+ else
+ {
+ String errorMessage =
+ "Unable to find method for transition from " + fromState + " to " + toState
+ + "in " + _stateModel.getClass();
+ logger.error(errorMessage);
+ taskResult.setSuccess(false);
+
+ _statusUpdateUtil.logError(message,
+ HelixStateTransitionHandler.class,
+ errorMessage,
+ accessor);
+ }
+ }
+
+ @Override
+ public HelixTaskResult handleMessage()
+ {
+ return handleMessageInternal(_message, _notificationContext);
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type)
+ {
+ // All internal error has been processed already, so we can skip them
+ if (type == ErrorType.INTERNAL)
+ {
+ logger.error("Skip internal error " + e.getMessage() + " " + code);
+ return;
+ }
+ HelixManager manager = _notificationContext.getManager();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ String instanceName = manager.getInstanceName();
+ String partition = _message.getPartitionName();
+ String resourceName = _message.getResourceName();
+ CurrentState currentStateDelta = new CurrentState(resourceName);
+
+ StateTransitionError error = new StateTransitionError(type, code, e);
+ _stateModel.rollbackOnError(_message, _notificationContext, error);
+ // if the transition is not canceled, it should go into error state
+ if (code == ErrorCode.ERROR)
+ {
+ currentStateDelta.setState(partition, "ERROR");
+ _stateModel.updateState("ERROR");
+
+ accessor.updateProperty(keyBuilder.currentState(instanceName,
+ _message.getTgtSessionId(),
+ resourceName),
+ currentStateDelta);
+ }
+ }
+
+ @Override
+ public void onTimeout()
+ {
+ _isTimeout = true;
+ }
+};
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
new file mode 100644
index 0000000..d14cfaa
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -0,0 +1,369 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.messaging.handling.GroupMessageHandler.GroupMessageInfo;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.monitoring.StateTransitionContext;
+import org.apache.helix.monitoring.StateTransitionDataPoint;
+import org.apache.helix.util.StatusUpdateUtil;
+import org.apache.log4j.Logger;
+
+
+public class HelixTask implements Callable<HelixTaskResult>
+{
+ private static Logger logger = Logger.getLogger(HelixTask.class);
+ private final Message _message;
+ private final MessageHandler _handler;
+ private final NotificationContext _notificationContext;
+ private final HelixManager _manager;
+ StatusUpdateUtil _statusUpdateUtil;
+ HelixTaskExecutor _executor;
+ volatile boolean _isTimeout = false;
+
+ public class TimeoutCancelTask extends TimerTask
+ {
+ HelixTaskExecutor _executor;
+ Message _message;
+ NotificationContext _context;
+
+ public TimeoutCancelTask(HelixTaskExecutor executor,
+ Message message,
+ NotificationContext context)
+ {
+ _executor = executor;
+ _message = message;
+ _context = context;
+ }
+
+ @Override
+ public void run()
+ {
+ _isTimeout = true;
+ logger.warn("Message time out, canceling. id:" + _message.getMsgId()
+ + " timeout : " + _message.getExecutionTimeout());
+ _handler.onTimeout();
+ _executor.cancelTask(_message, _context);
+ }
+
+ }
+
+ public HelixTask(Message message,
+ NotificationContext notificationContext,
+ MessageHandler handler,
+ HelixTaskExecutor executor) throws Exception
+ {
+ this._notificationContext = notificationContext;
+ this._message = message;
+ this._handler = handler;
+ this._manager = notificationContext.getManager();
+ _statusUpdateUtil = new StatusUpdateUtil();
+ _executor = executor;
+ }
+
+ @Override
+ public HelixTaskResult call()
+ {
+ // Start the timeout TimerTask, if necessary
+ Timer timer = null;
+ if (_message.getExecutionTimeout() > 0)
+ {
+ timer = new Timer(true);
+ timer.schedule(new TimeoutCancelTask(_executor, _message, _notificationContext),
+ _message.getExecutionTimeout());
+ logger.info("Message starts with timeout " + _message.getExecutionTimeout()
+ + " MsgId:" + _message.getMsgId());
+ }
+ else
+ {
+ logger.info("Message does not have timeout. MsgId:" + _message.getMsgId() + "/"
+ + _message.getPartitionName());
+ }
+
+ HelixTaskResult taskResult = new HelixTaskResult();
+
+ Exception exception = null;
+ ErrorType type = ErrorType.INTERNAL;
+ ErrorCode code = ErrorCode.ERROR;
+
+ long start = System.currentTimeMillis();
+ logger.info("msg:" + _message.getMsgId() + " handling task begin, at: " + start);
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ _statusUpdateUtil.logInfo(_message,
+ HelixTask.class,
+ "Message handling task begin execute",
+ accessor);
+ _message.setExecuteStartTimeStamp(new Date().getTime());
+
+ // Handle the message
+ try
+ {
+ taskResult = _handler.handleMessage();
+ exception = taskResult.getException();
+ }
+ catch (InterruptedException e)
+ {
+ _statusUpdateUtil.logError(_message,
+ HelixTask.class,
+ e,
+ "State transition interrupted, timeout:" + _isTimeout,
+ accessor);
+ logger.info("Message " + _message.getMsgId() + " is interrupted");
+ taskResult.setInterrupted(true);
+ taskResult.setException(e);
+ exception = e;
+ }
+ catch (Exception e)
+ {
+ String errorMessage =
+ "Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
+ + " type: " + _message.getMsgType();
+ logger.error(errorMessage, e);
+ _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
+ taskResult.setSuccess(false);
+ taskResult.setException(e);
+ taskResult.setMessage(e.getMessage());
+ exception = e;
+ }
+
+ // Cancel the timer since the handling is done
+ // it is fine if the TimerTask for canceling is called already
+ if (timer != null)
+ {
+ timer.cancel();
+ }
+
+ if (taskResult.isSucess())
+ {
+ _statusUpdateUtil.logInfo(_message,
+ _handler.getClass(),
+ "Message handling task completed successfully",
+ accessor);
+ logger.info("Message " + _message.getMsgId() + " completed.");
+ }
+ else if (taskResult.isInterrupted())
+ {
+ logger.info("Message " + _message.getMsgId() + " is interrupted");
+ code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
+ if (_isTimeout)
+ {
+ int retryCount = _message.getRetryCount();
+ logger.info("Message timeout, retry count: " + retryCount + " MSGID:"
+ + _message.getMsgId());
+ _statusUpdateUtil.logInfo(_message,
+ _handler.getClass(),
+ "Message handling task timeout, retryCount:"
+ + retryCount,
+ accessor);
+ // Notify the handler that timeout happens, and the number of retries left
+ // In case timeout happens (time out and also interrupted)
+ // we should retry the execution of the message by re-schedule it in
+ if (retryCount > 0)
+ {
+ _message.setRetryCount(retryCount - 1);
+ _executor.scheduleTask(_message, _handler, _notificationContext);
+ return taskResult;
+ }
+ }
+ }
+ else
+ // logging for errors
+ {
+ String errorMsg =
+ "Message execution failed. msgId: " + _message.getMsgId()
+ + taskResult.getMessage();
+ if (exception != null)
+ {
+ errorMsg += exception;
+ }
+ logger.error(errorMsg, exception);
+ _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
+ }
+
+ // Post-processing for the finished task
+ try
+ {
+ if (!_message.getGroupMessageMode())
+ {
+ removeMessageFromZk(accessor, _message);
+ reportMessageStat(_manager, _message, taskResult);
+ sendReply(accessor, _message, taskResult);
+ }
+ else
+ {
+ GroupMessageInfo info = _executor._groupMsgHandler.onCompleteSubMessage(_message);
+ if (info != null)
+ {
+ // TODO: changed to async update
+ // group update current state
+ Map<PropertyKey, CurrentState> curStateMap = info.merge();
+ for (PropertyKey key : curStateMap.keySet())
+ {
+ accessor.updateProperty(key, curStateMap.get(key));
+ }
+
+ // remove group message
+ removeMessageFromZk(accessor, _message);
+ reportMessageStat(_manager, _message, taskResult);
+ sendReply(accessor, _message, taskResult);
+ }
+ }
+ _executor.reportCompletion(_message);
+ }
+
+ // TODO: capture errors and log here
+ catch (Exception e)
+ {
+ String errorMessage =
+ "Exception after executing a message, msgId: " + _message.getMsgId() + e;
+ logger.error(errorMessage, e);
+ _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
+ exception = e;
+ type = ErrorType.FRAMEWORK;
+ code = ErrorCode.ERROR;
+ }
+ //
+ finally
+ {
+ long end = System.currentTimeMillis();
+ logger.info("msg:" + _message.getMsgId() + " handling task completed, results:"
+ + taskResult.isSucess() + ", at: " + end + ", took:" + (end - start));
+
+ // Notify the handler about any error happened in the handling procedure, so that
+ // the handler have chance to finally cleanup
+ if (exception != null)
+ {
+ _handler.onError(exception, code, type);
+ }
+ }
+ return taskResult;
+ }
+
+ private void removeMessageFromZk(HelixDataAccessor accessor, Message message)
+ {
+ Builder keyBuilder = accessor.keyBuilder();
+ if (message.getTgtName().equalsIgnoreCase("controller"))
+ {
+ // TODO: removeProperty returns boolean
+ accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId()));
+ }
+ else
+ {
+ accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(),
+ message.getMsgId()));
+ }
+ }
+
+ private void sendReply(HelixDataAccessor accessor,
+ Message message,
+ HelixTaskResult taskResult)
+ {
+ if (_message.getCorrelationId() != null
+ && !message.getMsgType().equals(MessageType.TASK_REPLY.toString()))
+ {
+ logger.info("Sending reply for message " + message.getCorrelationId());
+ _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", accessor);
+
+ taskResult.getTaskResultMap().put("SUCCESS", "" + taskResult.isSucess());
+ taskResult.getTaskResultMap().put("INTERRUPTED", "" + taskResult.isInterrupted());
+ if (!taskResult.isSucess())
+ {
+ taskResult.getTaskResultMap().put("ERRORINFO", taskResult.getMessage());
+ }
+ Message replyMessage =
+ Message.createReplyMessage(_message,
+ _manager.getInstanceName(),
+ taskResult.getTaskResultMap());
+ replyMessage.setSrcInstanceType(_manager.getInstanceType());
+
+ if (message.getSrcInstanceType() == InstanceType.PARTICIPANT)
+ {
+ Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.message(message.getMsgSrc(),
+ replyMessage.getMsgId()),
+ replyMessage);
+ }
+ else if (message.getSrcInstanceType() == InstanceType.CONTROLLER)
+ {
+ Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()),
+ replyMessage);
+ }
+ _statusUpdateUtil.logInfo(message, HelixTask.class, "1 msg replied to "
+ + replyMessage.getTgtName(), accessor);
+ }
+ }
+
+ private void reportMessageStat(HelixManager manager,
+ Message message,
+ HelixTaskResult taskResult)
+ {
+ // report stat
+ if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
+ {
+ return;
+ }
+ long now = new Date().getTime();
+ long msgReadTime = message.getReadTimeStamp();
+ long msgExecutionStartTime = message.getExecuteStartTimeStamp();
+ if (msgReadTime != 0 && msgExecutionStartTime != 0)
+ {
+ long totalDelay = now - msgReadTime;
+ long executionDelay = now - msgExecutionStartTime;
+ if (totalDelay > 0 && executionDelay > 0)
+ {
+ String fromState = message.getFromState();
+ String toState = message.getToState();
+ String transition = fromState + "--" + toState;
+
+ StateTransitionContext cxt =
+ new StateTransitionContext(manager.getClusterName(),
+ manager.getInstanceName(),
+ message.getResourceName(),
+ transition);
+
+ StateTransitionDataPoint data =
+ new StateTransitionDataPoint(totalDelay,
+ executionDelay,
+ taskResult.isSucess());
+ _executor.getParticipantMonitor().reportTransitionStat(cxt, data);
+ }
+ }
+ else
+ {
+ logger.warn("message read time and start execution time not recorded.");
+ }
+ }
+
+};
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
new file mode 100644
index 0000000..daff92b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -0,0 +1,638 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigScope;
+import org.apache.helix.ConfigScopeBuilder;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.NotificationContext.Type;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.monitoring.ParticipantMonitor;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.util.StatusUpdateUtil;
+import org.apache.log4j.Logger;
+
+
+public class HelixTaskExecutor implements MessageListener
+{
+ // TODO: we need to further design how to throttle this.
+ // From storage point of view, only bootstrap case is expensive
+ // and we need to throttle, which is mostly IO / network bounded.
+ public static final int DEFAULT_PARALLEL_TASKS = 40;
+ // TODO: create per-task type threadpool with customizable pool size
+ protected final Map<String, Future<HelixTaskResult>> _taskMap;
+ private final Object _lock;
+ private final StatusUpdateUtil _statusUpdateUtil;
+ private final ParticipantMonitor _monitor;
+ public static final String MAX_THREADS =
+ "maxThreads";
+
+ final ConcurrentHashMap<String, MessageHandlerFactory> _handlerFactoryMap =
+ new ConcurrentHashMap<String, MessageHandlerFactory>();
+
+ final ConcurrentHashMap<String, ExecutorService> _threadpoolMap =
+ new ConcurrentHashMap<String, ExecutorService>();
+
+ private static Logger LOG =
+ Logger.getLogger(HelixTaskExecutor.class);
+
+ Map<String, Integer> _resourceThreadpoolSizeMap =
+ new ConcurrentHashMap<String, Integer>();
+
+ final GroupMessageHandler _groupMsgHandler;
+
+ public HelixTaskExecutor()
+ {
+ _taskMap = new ConcurrentHashMap<String, Future<HelixTaskResult>>();
+ _groupMsgHandler = new GroupMessageHandler();
+
+ _lock = new Object();
+ _statusUpdateUtil = new StatusUpdateUtil();
+ _monitor = new ParticipantMonitor();
+ startMonitorThread();
+ }
+
+ public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
+ {
+ registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS);
+ }
+
+ public void registerMessageHandlerFactory(String type,
+ MessageHandlerFactory factory,
+ int threadpoolSize)
+ {
+ if (!_handlerFactoryMap.containsKey(type))
+ {
+ if (!type.equalsIgnoreCase(factory.getMessageType()))
+ {
+ throw new HelixException("Message factory type mismatch. Type: " + type
+ + " factory : " + factory.getMessageType());
+
+ }
+ _handlerFactoryMap.put(type, factory);
+ _threadpoolMap.put(type, Executors.newFixedThreadPool(threadpoolSize));
+ LOG.info("Adding msg factory for type " + type + " threadpool size "
+ + threadpoolSize);
+ }
+ else
+ {
+ LOG.error("Ignoring duplicate msg handler factory for type " + type);
+ }
+ }
+
+ public ParticipantMonitor getParticipantMonitor()
+ {
+ return _monitor;
+ }
+
+ private void startMonitorThread()
+ {
+ // start a thread which monitors the completions of task
+ }
+
+ void checkResourceConfig(String resourceName, HelixManager manager)
+ {
+ if (!_resourceThreadpoolSizeMap.containsKey(resourceName))
+ {
+ int threadpoolSize = -1;
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ if (configAccessor != null)
+ {
+ ConfigScope scope =
+ new ConfigScopeBuilder().forCluster(manager.getClusterName())
+ .forResource(resourceName)
+ .build();
+
+ String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
+ try
+ {
+ if (threadpoolSizeStr != null)
+ {
+ threadpoolSize = Integer.parseInt(threadpoolSizeStr);
+ }
+ }
+ catch (Exception e)
+ {
+ LOG.error("", e);
+ }
+ }
+ if (threadpoolSize > 0)
+ {
+ String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName;
+ _threadpoolMap.put(key, Executors.newFixedThreadPool(threadpoolSize));
+ LOG.info("Adding per resource threadpool for resource " + resourceName
+ + " with size " + threadpoolSize);
+ }
+ _resourceThreadpoolSizeMap.put(resourceName, threadpoolSize);
+ }
+ }
+
+ /**
+ * Find the executor service for the message. A message can have a per-statemodelfactory
+ * executor service, or per-message type executor service.
+ *
+ **/
+ ExecutorService findExecutorServiceForMsg(Message message)
+ {
+ ExecutorService executorService = _threadpoolMap.get(message.getMsgType());
+ if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
+ {
+ String resourceName = message.getResourceName();
+ if (resourceName != null)
+ {
+ String key = message.getMsgType() + "." + resourceName;
+ if (_threadpoolMap.containsKey(key))
+ {
+ LOG.info("Find per-resource thread pool with key " + key);
+ executorService = _threadpoolMap.get(key);
+ }
+ }
+ }
+ return executorService;
+ }
+
+ public void scheduleTask(Message message,
+ MessageHandler handler,
+ NotificationContext notificationContext)
+ {
+ assert (handler != null);
+ synchronized (_lock)
+ {
+ try
+ {
+ String taskId = message.getMsgId() + "/" + message.getPartitionName();
+
+ if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
+ {
+ checkResourceConfig(message.getResourceName(), notificationContext.getManager());
+ }
+ LOG.info("Scheduling message: " + taskId);
+ // System.out.println("sched msg: " + message.getPartitionName() + "-"
+ // + message.getTgtName() + "-" + message.getFromState() + "-"
+ // + message.getToState());
+
+ _statusUpdateUtil.logInfo(message,
+ HelixTaskExecutor.class,
+ "Message handling task scheduled",
+ notificationContext.getManager().getHelixDataAccessor());
+
+ HelixTask task = new HelixTask(message, notificationContext, handler, this);
+ if (!_taskMap.containsKey(taskId))
+ {
+ LOG.info("Message:" + taskId + " handling task scheduled");
+ Future<HelixTaskResult> future =
+ findExecutorServiceForMsg(message).submit(task);
+ _taskMap.put(taskId, future);
+ }
+ else
+ {
+ _statusUpdateUtil.logWarning(message,
+ HelixTaskExecutor.class,
+ "Message handling task already sheduled for "
+ + taskId,
+ notificationContext.getManager()
+ .getHelixDataAccessor());
+ }
+ }
+ catch (Exception e)
+ {
+ LOG.error("Error while executing task." + message, e);
+
+ _statusUpdateUtil.logError(message,
+ HelixTaskExecutor.class,
+ e,
+ "Error while executing task " + e,
+ notificationContext.getManager()
+ .getHelixDataAccessor());
+ }
+ }
+ }
+
+ public void cancelTask(Message message, NotificationContext notificationContext)
+ {
+ synchronized (_lock)
+ {
+ String taskId = message.getMsgId() + "/" + message.getPartitionName();
+
+ if (_taskMap.containsKey(taskId))
+ {
+ _statusUpdateUtil.logInfo(message,
+ HelixTaskExecutor.class,
+ "Trying to cancel the future for " + taskId,
+ notificationContext.getManager().getHelixDataAccessor());
+ Future<HelixTaskResult> future = _taskMap.get(taskId);
+
+ // If the thread is still running it will be interrupted if cancel(true)
+ // is called. So state transition callbacks should implement logic to
+ // return
+ // if it is interrupted.
+ if (future.cancel(true))
+ {
+ _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled "
+ + taskId, notificationContext.getManager().getHelixDataAccessor());
+ _taskMap.remove(taskId);
+ }
+ else
+ {
+ _statusUpdateUtil.logInfo(message,
+ HelixTaskExecutor.class,
+ "false when trying to cancel the message " + taskId,
+ notificationContext.getManager()
+ .getHelixDataAccessor());
+ }
+ }
+ else
+ {
+ _statusUpdateUtil.logWarning(message,
+ HelixTaskExecutor.class,
+ "Future not found when trying to cancel " + taskId,
+ notificationContext.getManager()
+ .getHelixDataAccessor());
+ }
+ }
+ }
+
+ protected void reportCompletion(Message message) // String msgId)
+ {
+ synchronized (_lock)
+ {
+ String taskId = message.getMsgId() + "/" + message.getPartitionName();
+ LOG.info("message finished: " + taskId + ", took "
+ + (new Date().getTime() - message.getExecuteStartTimeStamp()));
+ if (_taskMap.containsKey(taskId))
+ {
+ _taskMap.remove(taskId);
+ }
+ else
+ {
+ LOG.warn("message " + taskId + "not found in task map");
+ }
+ }
+ }
+
+ private void updateMessageState(List<Message> readMsgs,
+ HelixDataAccessor accessor,
+ String instanceName)
+ {
+ Builder keyBuilder = accessor.keyBuilder();
+ List<PropertyKey> readMsgKeys = new ArrayList<PropertyKey>();
+ for (Message msg : readMsgs)
+ {
+ readMsgKeys.add(msg.getKey(keyBuilder, instanceName));
+ }
+ accessor.setChildren(readMsgKeys, readMsgs);
+ }
+
+ @Override
+ public void onMessage(String instanceName,
+ List<Message> messages,
+ NotificationContext changeContext)
+ {
+ // If FINALIZE notification comes, reset all handler factories
+ // and terminate all the thread pools
+ // TODO: see if we should have a separate notification call for resetting
+ if (changeContext.getType() == Type.FINALIZE)
+ {
+ LOG.info("Get FINALIZE notification");
+ for (MessageHandlerFactory factory : _handlerFactoryMap.values())
+ {
+ factory.reset();
+ }
+ // Cancel all scheduled future
+ // synchronized (_lock)
+ {
+ for (Future<HelixTaskResult> f : _taskMap.values())
+ {
+ f.cancel(true);
+ }
+ _taskMap.clear();
+ }
+ return;
+ }
+
+ HelixManager manager = changeContext.getManager();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+
+ if (messages == null || messages.size() == 0)
+ {
+ LOG.info("No Messages to process");
+ return;
+ }
+
+ // sort message by creation timestamp, so message created earlier is processed first
+ Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
+
+ // message handlers created
+ List<MessageHandler> handlers = new ArrayList<MessageHandler>();
+
+ // message read
+ List<Message> readMsgs = new ArrayList<Message>();
+
+ String sessionId = manager.getSessionId();
+ List<String> curResourceNames =
+ accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
+ List<PropertyKey> createCurStateKeys = new ArrayList<PropertyKey>();
+ List<CurrentState> metaCurStates = new ArrayList<CurrentState>();
+ Set<String> createCurStateNames = new HashSet<String>();
+
+ changeContext.add(NotificationContext.TASK_EXECUTOR_KEY, this);
+ for (Message message : messages)
+ {
+ // nop messages are simply removed. It is used to trigger onMessage() in
+ // situations such as register a new message handler factory
+ if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString()))
+ {
+ LOG.info("Dropping NO-OP message. mid: " + message.getId() + ", from: "
+ + message.getMsgSrc());
+ accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+ continue;
+ }
+
+ String tgtSessionId = message.getTgtSessionId();
+
+ // if sessionId not match, remove it
+ if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*"))
+ {
+ String warningMessage =
+ "SessionId does NOT match. expected sessionId: " + sessionId
+ + ", tgtSessionId in message: " + tgtSessionId + ", messageId: "
+ + message.getMsgId();
+ LOG.warn(warningMessage);
+ accessor.removeProperty(message.getKey(keyBuilder, instanceName));
+ _statusUpdateUtil.logWarning(message,
+ HelixStateMachineEngine.class,
+ warningMessage,
+ accessor);
+ continue;
+ }
+
+ // don't process message that is of READ or UNPROCESSABLE state
+ if (MessageState.NEW != message.getMsgState())
+ {
+ // It happens because we don't delete message right after
+ // read. Instead we keep it until the current state is updated.
+ // We will read the message again if there is a new message but we
+ // check for the status and ignore if its already read
+ LOG.trace("Message already read. mid: " + message.getMsgId());
+ continue;
+ }
+
+ // create message handlers, if handlers not found, leave its state as NEW
+ try
+ {
+ List<MessageHandler> createHandlers =
+ createMessageHandlers(message, changeContext);
+ if (createHandlers.isEmpty())
+ {
+ continue;
+ }
+ handlers.addAll(createHandlers);
+ }
+ catch (Exception e)
+ {
+ LOG.error("Failed to create message handler for " + message.getMsgId(), e);
+ String error =
+ "Failed to create message handler for " + message.getMsgId()
+ + ", exception: " + e;
+
+ _statusUpdateUtil.logError(message,
+ HelixStateMachineEngine.class,
+ e,
+ error,
+ accessor);
+
+ // Mark message state UNPROCESSABLE if we hit an exception in creating
+ // message handler. The message will stay on zookeeper but will not be processed
+ message.setMsgState(MessageState.UNPROCESSABLE);
+ accessor.updateProperty(message.getKey(keyBuilder, instanceName), message);
+ continue;
+ }
+
+ // update msgState to read
+ message.setMsgState(MessageState.READ);
+ message.setReadTimeStamp(new Date().getTime());
+ message.setExecuteSessionId(changeContext.getManager().getSessionId());
+
+ _statusUpdateUtil.logInfo(message,
+ HelixStateMachineEngine.class,
+ "New Message",
+ accessor);
+
+ readMsgs.add(message);
+
+ // batch creation of all current state meta data
+ // do it for non-controller and state transition messages only
+ if (!message.isControlerMsg()
+ && message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString()))
+ {
+ String resourceName = message.getResourceName();
+ if (!curResourceNames.contains(resourceName)
+ && !createCurStateNames.contains(resourceName))
+ {
+ createCurStateNames.add(resourceName);
+ createCurStateKeys.add(keyBuilder.currentState(instanceName,
+ sessionId,
+ resourceName));
+
+ CurrentState metaCurState = new CurrentState(resourceName);
+ metaCurState.setBucketSize(message.getBucketSize());
+ metaCurState.setStateModelDefRef(message.getStateModelDef());
+ metaCurState.setSessionId(sessionId);
+ metaCurState.setGroupMessageMode(message.getGroupMessageMode());
+ String ftyName = message.getStateModelFactoryName();
+ if (ftyName != null)
+ {
+ metaCurState.setStateModelFactoryName(ftyName);
+ }
+ else
+ {
+ metaCurState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+ }
+
+ metaCurStates.add(metaCurState);
+ }
+ }
+ }
+
+ // batch create curState meta
+ if (createCurStateKeys.size() > 0)
+ {
+ try
+ {
+ accessor.createChildren(createCurStateKeys, metaCurStates);
+ }
+ catch (Exception e)
+ {
+ LOG.error(e);
+ }
+ }
+
+ // update message state to READ in batch and schedule all read messages
+ if (readMsgs.size() > 0)
+ {
+ updateMessageState(readMsgs, accessor, instanceName);
+
+ for (MessageHandler handler : handlers)
+ {
+ scheduleTask(handler._message, handler, changeContext);
+ }
+ }
+ }
+
+ private MessageHandler createMessageHandler(Message message,
+ NotificationContext changeContext)
+ {
+ String msgType = message.getMsgType().toString();
+
+ MessageHandlerFactory handlerFactory = _handlerFactoryMap.get(msgType);
+
+ // Fail to find a MessageHandlerFactory for the message
+ // we will keep the message and the message will be handled when
+ // the corresponding MessageHandlerFactory is registered
+ if (handlerFactory == null)
+ {
+ LOG.warn("Fail to find message handler factory for type: " + msgType + " mid:"
+ + message.getMsgId());
+ return null;
+ }
+
+ return handlerFactory.createHandler(message, changeContext);
+ }
+
+ private List<MessageHandler> createMessageHandlers(Message message,
+ NotificationContext changeContext)
+ {
+ List<MessageHandler> handlers = new ArrayList<MessageHandler>();
+ if (!message.getGroupMessageMode())
+ {
+ LOG.info("Creating handler for message " + message.getMsgId() + "/"
+ + message.getPartitionName());
+
+ MessageHandler handler = createMessageHandler(message, changeContext);
+
+ if (handler != null)
+ {
+ handlers.add(handler);
+ }
+ }
+ else
+ {
+ _groupMsgHandler.put(message);
+
+ List<String> partitionNames = message.getPartitionNames();
+ for (String partitionName : partitionNames)
+ {
+ Message subMsg = new Message(message.getRecord());
+ subMsg.setPartitionName(partitionName);
+ subMsg.setAttribute(Attributes.PARENT_MSG_ID, message.getId());
+
+ LOG.info("Creating handler for group message " + subMsg.getMsgId() + "/"
+ + partitionName);
+ MessageHandler handler = createMessageHandler(subMsg, changeContext);
+ if (handler != null)
+ {
+ handlers.add(handler);
+ }
+ }
+ }
+
+ return handlers;
+ }
+
+ public void shutDown()
+ {
+ LOG.info("shutting down TaskExecutor");
+ synchronized (_lock)
+ {
+ for (String msgType : _threadpoolMap.keySet())
+ {
+ List<Runnable> tasksLeft = _threadpoolMap.get(msgType).shutdownNow();
+ LOG.info(tasksLeft.size() + " tasks are still in the threadpool for msgType "
+ + msgType);
+ }
+ for (String msgType : _threadpoolMap.keySet())
+ {
+ try
+ {
+ if (!_threadpoolMap.get(msgType).awaitTermination(200, TimeUnit.MILLISECONDS))
+ {
+ LOG.warn(msgType + " is not fully termimated in 200 MS");
+ System.out.println(msgType + " is not fully termimated in 200 MS");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ LOG.error("Interrupted", e);
+ }
+ }
+ }
+ _monitor.shutDown();
+ LOG.info("shutdown finished");
+ }
+
+ // TODO: remove this
+ public static void main(String[] args) throws Exception
+ {
+ ExecutorService pool = Executors.newFixedThreadPool(DEFAULT_PARALLEL_TASKS);
+ Future<HelixTaskResult> future;
+ future = pool.submit(new Callable<HelixTaskResult>()
+ {
+
+ @Override
+ public HelixTaskResult call() throws Exception
+ {
+ System.out.println("CMTaskExecutor.main(...).new Callable() {...}.call()");
+ return null;
+ }
+
+ });
+ future = pool.submit(new HelixTask(null, null, null, null));
+ Thread.currentThread().join();
+ System.out.println(future.isDone());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/437eb42e/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
new file mode 100644
index 0000000..ee6919e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.helix.messaging.handling;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class HelixTaskResult
+{
+
+ private boolean _success;
+ private String _message = "";
+ private Map<String, String> _taskResultMap = new HashMap<String, String>();
+ private boolean _interrupted = false;
+ Exception _exception = null;
+
+ public boolean isSucess()
+ {
+ return _success;
+ }
+
+ public boolean isInterrupted()
+ {
+ return _interrupted;
+ }
+
+ public void setInterrupted(boolean interrupted)
+ {
+ _interrupted = interrupted;
+ }
+
+ public void setSuccess(boolean success)
+ {
+ this._success = success;
+ }
+
+ public String getMessage()
+ {
+ return _message;
+ }
+
+ public void setMessage(String message)
+ {
+ this._message = message;
+ }
+
+ public Map<String, String> getTaskResultMap()
+ {
+ return _taskResultMap;
+ }
+
+ public void setException(Exception e)
+ {
+ _exception = e;
+ }
+
+ public Exception getException()
+ {
+ return _exception;
+ }
+}