You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2012/10/25 00:26:41 UTC
[19/47] Refactoring from com.linkedin.helix to org.apache.helix
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/manager/zk/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/manager/zk/package-info.java b/helix-core/src/main/java/com/linkedin/helix/manager/zk/package-info.java
deleted file mode 100644
index bb5e7f8..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/manager/zk/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-/**
- * zookeeper-based implementation of Helix cluster manager
- *
- */
-package com.linkedin.helix.manager.zk;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/AsyncCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/AsyncCallback.java b/helix-core/src/main/java/com/linkedin/helix/messaging/AsyncCallback.java
deleted file mode 100644
index 330ec57..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/AsyncCallback.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.model.Message;
-
-public abstract class AsyncCallback
-{
-
- private static Logger _logger = Logger.getLogger(AsyncCallback.class);
- long _startTimeStamp = 0;
- protected long _timeout = -1;
- Timer _timer = null;
- List<Message> _messagesSent;
- protected final List<Message> _messageReplied = new ArrayList<Message>();
- boolean _timedOut = false;
- boolean _isInterrupted = false;
-
- /**
- * Enforcing timeout to be set
- *
- * @param timeout
- */
- public AsyncCallback(long timeout)
- {
- _logger.info("Setting time out to " + timeout + " ms");
- _timeout = timeout;
- }
-
- public AsyncCallback()
- {
- this(-1);
- }
-
- public final void setTimeout(long timeout)
- {
- _logger.info("Setting time out to " + timeout + " ms");
- _timeout = timeout;
-
- }
-
- public List<Message> getMessageReplied()
- {
- return _messageReplied;
- }
-
- public boolean isInterrupted()
- {
- return _isInterrupted;
- }
-
- public void setInterrupted(boolean b)
- {
- _isInterrupted = true;
- }
-
- public synchronized final void onReply(Message message)
- {
- _logger.info("OnReply msg " + message.getMsgId());
- if (!isDone())
- {
- _messageReplied.add(message);
- try
- {
- onReplyMessage(message);
- }
- catch(Exception e)
- {
- _logger.error(e);
- }
- }
- if (isDone())
- {
- if(_timer != null)
- {
- _timer.cancel();
- }
- notifyAll();
- }
- }
-
- /**
- * Default implementation will wait until every message sent gets a response
- *
- * @return
- */
- public boolean isDone()
- {
- return _messageReplied.size() == _messagesSent.size();
- }
-
- public boolean isTimedOut()
- {
- return _timedOut;
- }
-
- final void setMessagesSent(List<Message> generatedMessage)
- {
- _messagesSent = generatedMessage;
- }
-
- final void startTimer()
- {
- if (_timer == null && _timeout > 0)
- {
- if (_startTimeStamp == 0)
- {
- _startTimeStamp = new Date().getTime();
- }
- _timer = new Timer(true);
- _timer.schedule(new TimeoutTask(this), _timeout);
- }
- }
-
- public abstract void onTimeOut();
-
- public abstract void onReplyMessage(Message message);
-
- class TimeoutTask extends TimerTask
- {
- AsyncCallback _callback;
-
- public TimeoutTask(AsyncCallback asyncCallback)
- {
- _callback = asyncCallback;
- }
-
- @Override
- public void run()
- {
- try
- {
- synchronized (_callback)
- {
- _callback._timedOut = true;
- _callback.notifyAll();
- _callback.onTimeOut();
- }
- }
- catch (Exception e)
- {
- _logger.error(e);
- }
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/CriteriaEvaluator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/CriteriaEvaluator.java b/helix-core/src/main/java/com/linkedin/helix/messaging/CriteriaEvaluator.java
deleted file mode 100644
index 6de0d64..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/CriteriaEvaluator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-import org.josql.Query;
-import org.josql.QueryExecutionException;
-import org.josql.QueryParseException;
-import org.josql.QueryResults;
-
-import com.linkedin.helix.DataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.PropertyType;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.Criteria.DataSource;
-import com.linkedin.helix.josql.ClusterJosqlQueryProcessor;
-import com.linkedin.helix.josql.ZNRecordRow;
-
-public class CriteriaEvaluator
-{
- private static Logger logger = Logger.getLogger(CriteriaEvaluator.class);
-
- public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria, HelixManager manager)
- {
- List<Map<String, String>> selected = new ArrayList<Map<String, String>>();
-
- String queryFields =
- (!recipientCriteria.getInstanceName().equals("") ? " " + ZNRecordRow.MAP_SUBKEY : " ''") +","+
- (!recipientCriteria.getResource().equals("") ? " " + ZNRecordRow.ZNRECORD_ID : " ''") +","+
- (!recipientCriteria.getPartition().equals("") ? " " + ZNRecordRow.MAP_KEY : " ''") +","+
- (!recipientCriteria.getPartitionState().equals("") ? " " + ZNRecordRow.MAP_VALUE : " '' ");
-
- String matchCondition =
- ZNRecordRow.MAP_SUBKEY + " LIKE '" + (!recipientCriteria.getInstanceName().equals("") ? (recipientCriteria.getInstanceName() +"'") : "%' ") + " AND "+
- ZNRecordRow.ZNRECORD_ID+ " LIKE '" + (!recipientCriteria.getResource().equals("") ? (recipientCriteria.getResource() +"'") : "%' ") + " AND "+
- ZNRecordRow.MAP_KEY + " LIKE '" + (!recipientCriteria.getPartition().equals("") ? (recipientCriteria.getPartition() +"'") : "%' ") + " AND "+
- ZNRecordRow.MAP_VALUE + " LIKE '" + (!recipientCriteria.getPartitionState().equals("") ? (recipientCriteria.getPartitionState()+"'") : "%' ") + " AND "+
- ZNRecordRow.MAP_SUBKEY + " IN ((SELECT [*]id FROM :LIVEINSTANCES))";
-
-
- String queryTarget = recipientCriteria.getDataSource().toString() + ClusterJosqlQueryProcessor.FLATTABLE;
-
- String josql = "SELECT DISTINCT " + queryFields
- + " FROM " + queryTarget + " WHERE "
- + matchCondition;
- ClusterJosqlQueryProcessor p = new ClusterJosqlQueryProcessor(manager);
- List<Object> result = new ArrayList<Object>();
- try
- {
- logger.info("JOSQL query: " + josql);
- result = p.runJoSqlQuery(josql, null, null);
- }
- catch (Exception e)
- {
- logger.error("", e);
- return selected;
- }
-
- for(Object o : result)
- {
- Map<String, String> resultRow = new HashMap<String, String>();
- List<Object> row = (List<Object>)o;
- resultRow.put("instanceName", (String)(row.get(0)));
- resultRow.put("resourceName", (String)(row.get(1)));
- resultRow.put("partitionName", (String)(row.get(2)));
- resultRow.put("partitionState", (String)(row.get(3)));
- selected.add(resultRow);
- }
- logger.info("JOSQL query return " + selected.size() + " rows");
- return selected;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/com/linkedin/helix/messaging/DefaultMessagingService.java
deleted file mode 100644
index d98b3a4..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/DefaultMessagingService.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ClusterMessagingService;
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.Criteria;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.messaging.handling.AsyncCallbackService;
-import com.linkedin.helix.messaging.handling.HelixTaskExecutor;
-import com.linkedin.helix.messaging.handling.MessageHandlerFactory;
-import com.linkedin.helix.model.LiveInstance;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-
-public class DefaultMessagingService implements ClusterMessagingService
-{
- private final HelixManager _manager;
- private final CriteriaEvaluator _evaluator;
- private final HelixTaskExecutor _taskExecutor;
- // TODO:rename to factory, this is not a service
- private final AsyncCallbackService _asyncCallbackService;
- private static Logger _logger =
- Logger.getLogger(DefaultMessagingService.class);
- ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded
- = new ConcurrentHashMap<String, MessageHandlerFactory>();
-
- public DefaultMessagingService(HelixManager manager)
- {
- _manager = manager;
- _evaluator = new CriteriaEvaluator();
- _taskExecutor = new HelixTaskExecutor();
- _asyncCallbackService = new AsyncCallbackService();
- _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
- _asyncCallbackService);
- }
-
- @Override
- public int send(Criteria recipientCriteria, final Message messageTemplate)
- {
- return send(recipientCriteria, messageTemplate, null, -1);
- }
-
- @Override
- public int send(final Criteria recipientCriteria,
- final Message message,
- AsyncCallback callbackOnReply,
- int timeOut)
- {
- return send(recipientCriteria, message, callbackOnReply, timeOut, 0);
- }
-
- @Override
- public int send(final Criteria recipientCriteria,
- final Message message,
- AsyncCallback callbackOnReply,
- int timeOut,
- int retryCount)
- {
- Map<InstanceType, List<Message>> generateMessage =
- generateMessage(recipientCriteria, message);
- int totalMessageCount = 0;
- for (List<Message> messages : generateMessage.values())
- {
- totalMessageCount += messages.size();
- }
- _logger.info("Send " + totalMessageCount + " messages with criteria "
- + recipientCriteria);
- if (totalMessageCount == 0)
- {
- return 0;
- }
- String correlationId = null;
- if (callbackOnReply != null)
- {
- int totalTimeout = timeOut * (retryCount + 1);
- if (totalTimeout < 0)
- {
- totalTimeout = -1;
- }
- callbackOnReply.setTimeout(totalTimeout);
- correlationId = UUID.randomUUID().toString();
- for (List<Message> messages : generateMessage.values())
- {
- callbackOnReply.setMessagesSent(messages);
- }
- _asyncCallbackService.registerAsyncCallback(correlationId, callbackOnReply);
- }
-
- for (InstanceType receiverType : generateMessage.keySet())
- {
- List<Message> list = generateMessage.get(receiverType);
- for (Message tempMessage : list)
- {
- tempMessage.setRetryCount(retryCount);
- tempMessage.setExecutionTimeout(timeOut);
- tempMessage.setSrcInstanceType(_manager.getInstanceType());
- if (correlationId != null)
- {
- tempMessage.setCorrelationId(correlationId);
- }
-
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- if (receiverType == InstanceType.CONTROLLER)
- {
- // _manager.getDataAccessor().setProperty(PropertyType.MESSAGES_CONTROLLER,
- // tempMessage,
- // tempMessage.getId());
- accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()),
- tempMessage);
- }
-
- if (receiverType == InstanceType.PARTICIPANT)
- {
- accessor.setProperty(keyBuilder.message(tempMessage.getTgtName(),
- tempMessage.getId()),
- tempMessage);
- }
- }
- }
-
- if (callbackOnReply != null)
- {
- // start timer if timeout is set
- callbackOnReply.startTimer();
- }
- return totalMessageCount;
- }
-
- private Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
- final Message message)
- {
- Map<InstanceType, List<Message>> messagesToSendMap =
- new HashMap<InstanceType, List<Message>>();
- InstanceType instanceType = recipientCriteria.getRecipientInstanceType();
-
- if (instanceType == InstanceType.CONTROLLER)
- {
- List<Message> messages = generateMessagesForController(message);
- messagesToSendMap.put(InstanceType.CONTROLLER, messages);
- // _dataAccessor.setControllerProperty(PropertyType.MESSAGES,
- // newMessage.getRecord(), CreateMode.PERSISTENT);
- }
- else if (instanceType == InstanceType.PARTICIPANT)
- {
- List<Message> messages = new ArrayList<Message>();
- List<Map<String, String>> matchedList =
- _evaluator.evaluateCriteria(recipientCriteria, _manager);
-
- if (!matchedList.isEmpty())
- {
- Map<String, String> sessionIdMap = new HashMap<String, String>();
- if (recipientCriteria.isSessionSpecific())
- {
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- List<LiveInstance> liveInstances =
- accessor.getChildValues(keyBuilder.liveInstances());
-
- for (LiveInstance liveInstance : liveInstances)
- {
- sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
- }
- }
- for (Map<String, String> map : matchedList)
- {
- String id = UUID.randomUUID().toString();
- Message newMessage = new Message(message.getRecord(), id);
- String srcInstanceName = _manager.getInstanceName();
- String tgtInstanceName = map.get("instanceName");
- // Don't send message to self
- if (recipientCriteria.isSelfExcluded()
- && srcInstanceName.equalsIgnoreCase(tgtInstanceName))
- {
- continue;
- }
- newMessage.setSrcName(srcInstanceName);
- newMessage.setTgtName(tgtInstanceName);
- newMessage.setResourceName(map.get("resourceName"));
- newMessage.setPartitionName(map.get("partitionName"));
- if (recipientCriteria.isSessionSpecific())
- {
- newMessage.setTgtSessionId(sessionIdMap.get(tgtInstanceName));
- }
- messages.add(newMessage);
- }
- messagesToSendMap.put(InstanceType.PARTICIPANT, messages);
- }
- }
- return messagesToSendMap;
- }
-
- private List<Message> generateMessagesForController(Message message)
- {
- List<Message> messages = new ArrayList<Message>();
- String id = UUID.randomUUID().toString();
- Message newMessage = new Message(message.getRecord(), id);
- newMessage.setMsgId(id);
- newMessage.setSrcName(_manager.getInstanceName());
- newMessage.setTgtName("Controller");
- messages.add(newMessage);
- return messages;
- }
-
- @Override
- public synchronized void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
- {
- if (_manager.isConnected())
- {
- registerMessageHandlerFactoryInternal(type, factory);
- }
- else
- {
- _messageHandlerFactoriestobeAdded.put(type, factory);
- }
- }
-
- public synchronized void onConnected()
- {
- for(String type : _messageHandlerFactoriestobeAdded.keySet())
- {
- registerMessageHandlerFactoryInternal(type, _messageHandlerFactoriestobeAdded.get(type));
- }
- _messageHandlerFactoriestobeAdded.clear();
- }
-
- void registerMessageHandlerFactoryInternal(String type, MessageHandlerFactory factory)
- {
- _logger.info("registering msg factory for type " + type);
- int threadpoolSize = HelixTaskExecutor.DEFAULT_PARALLEL_TASKS;
- String threadpoolSizeStr = null;
- String key = type + "." + HelixTaskExecutor.MAX_THREADS;
-
- ConfigAccessor configAccessor = _manager.getConfigAccessor();
- if(configAccessor != null)
- {
- ConfigScope scope = null;
-
- // Read the participant config and cluster config for the per-message type thread pool size.
- // participant config will override the cluster config.
-
- if(_manager.getInstanceType() == InstanceType.PARTICIPANT || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
- {
- scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).forParticipant(_manager.getInstanceName()).build();
- threadpoolSizeStr = configAccessor.get(scope, key);
- }
-
- if(threadpoolSizeStr == null)
- {
- scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
- threadpoolSizeStr = configAccessor.get(scope, key);
- }
- }
-
- if(threadpoolSizeStr != null)
- {
- try
- {
- threadpoolSize = Integer.parseInt(threadpoolSizeStr);
- if(threadpoolSize <= 0)
- {
- threadpoolSize = 1;
- }
- }
- catch(Exception e)
- {
- _logger.error("", e);
- }
- }
-
- _taskExecutor.registerMessageHandlerFactory(type, factory, threadpoolSize);
- // Self-send a no-op message, so that the onMessage() call will be invoked
- // again, and
- // we have a chance to process the message that we received with the new
- // added MessageHandlerFactory
- // before the factory is added.
- sendNopMessage();
- }
-
- public void sendNopMessage()
- {
- if (_manager.isConnected())
- {
- try
- {
- Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
- nopMsg.setSrcName(_manager.getInstanceName());
-
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- if (_manager.getInstanceType() == InstanceType.CONTROLLER
- || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
- {
- nopMsg.setTgtName("Controller");
- accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
- }
-
- if (_manager.getInstanceType() == InstanceType.PARTICIPANT
- || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
- {
- nopMsg.setTgtName(_manager.getInstanceName());
- accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()),
- nopMsg);
- }
- }
- catch (Exception e)
- {
- _logger.error(e);
- }
- }
- }
-
- public HelixTaskExecutor getExecutor()
- {
- return _taskExecutor;
- }
-
- @Override
- public int sendAndWait(Criteria receipientCriteria,
- Message message,
- AsyncCallback asyncCallback,
- int timeOut,
- int retryCount)
- {
- int messagesSent =
- send(receipientCriteria, message, asyncCallback, timeOut, retryCount);
- if (messagesSent > 0)
- {
- while (!asyncCallback.isDone() && !asyncCallback.isTimedOut())
- {
- synchronized (asyncCallback)
- {
- try
- {
- asyncCallback.wait();
- }
- catch (InterruptedException e)
- {
- _logger.error(e);
- asyncCallback.setInterrupted(true);
- break;
- }
- }
- }
- }
- else
- {
- _logger.warn("No messages sent. For Criteria:" + receipientCriteria);
- }
- return messagesSent;
- }
-
- @Override
- public int sendAndWait(Criteria recipientCriteria,
- Message message,
- AsyncCallback asyncCallback,
- int timeOut)
- {
- return sendAndWait(recipientCriteria, message, asyncCallback, timeOut, 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/handling/AsyncCallbackService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/AsyncCallbackService.java b/helix-core/src/main/java/com/linkedin/helix/messaging/handling/AsyncCallbackService.java
deleted file mode 100644
index 078ce08..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/AsyncCallbackService.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging.handling;
-
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.messaging.AsyncCallback;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorCode;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorType;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.participant.HelixStateMachineEngine;
-
-public class AsyncCallbackService implements MessageHandlerFactory
-{
- private final ConcurrentHashMap<String, AsyncCallback> _callbackMap = new ConcurrentHashMap<String, AsyncCallback>();
- private static Logger _logger = Logger.getLogger(AsyncCallbackService.class);
-
- public AsyncCallbackService()
- {
- }
-
- public void registerAsyncCallback(String correlationId, AsyncCallback callback)
- {
- if (_callbackMap.containsKey(correlationId))
- {
- _logger.warn("correlation id " + correlationId + " already registered");
- }
- _logger.info("registering correlation id " + correlationId);
- _callbackMap.put(correlationId, callback);
- }
-
- void verifyMessage(Message message)
- {
- if (!message.getMsgType().toString()
- .equalsIgnoreCase(MessageType.TASK_REPLY.toString()))
- {
- String errorMsg = "Unexpected msg type for message " + message.getMsgId()
- + " type:" + message.getMsgType() + " Expected : "
- + MessageType.TASK_REPLY;
- _logger.error(errorMsg);
- throw new HelixException(errorMsg);
- }
- String correlationId = message.getCorrelationId();
- if (correlationId == null)
- {
- String errorMsg = "Message " + message.getMsgId()
- + " does not have correlation id";
- _logger.error(errorMsg);
- throw new HelixException(errorMsg);
- }
-
- if (!_callbackMap.containsKey(correlationId))
- {
- String errorMsg = "Message "
- + message.getMsgId()
- + " does not have correponding callback. Probably timed out already. Correlation id: "
- + correlationId;
- _logger.error(errorMsg);
- throw new HelixException(errorMsg);
- }
- _logger.info("Verified reply message " + message.getMsgId()
- + " correlation:" + correlationId);
- }
-
- @Override
- public MessageHandler createHandler(Message message,
- NotificationContext context)
- {
- verifyMessage(message);
- return new AsyncCallbackMessageHandler(message.getCorrelationId(),message, context);
- }
-
- @Override
- public String getMessageType()
- {
- return MessageType.TASK_REPLY.toString();
- }
-
- @Override
- public void reset()
- {
-
- }
-
- public class AsyncCallbackMessageHandler extends MessageHandler
- {
- private final String _correlationId;
-
- public AsyncCallbackMessageHandler(String correlationId, Message message, NotificationContext context)
- {
- super(message, context);
- _correlationId = correlationId;
- }
-
- @Override
- public HelixTaskResult handleMessage() throws InterruptedException
- {
- verifyMessage(_message);
- HelixTaskResult result = new HelixTaskResult();
- assert (_correlationId.equalsIgnoreCase(_message.getCorrelationId()));
- _logger.info("invoking reply message " + _message.getMsgId()
- + ", correlationid:" + _correlationId);
-
- AsyncCallback callback = _callbackMap.get(_correlationId);
- synchronized (callback)
- {
- callback.onReply(_message);
- if (callback.isDone())
- {
- _logger.info("Removing finished callback, correlationid:"
- + _correlationId);
- _callbackMap.remove(_correlationId);
- }
- }
- result.setSuccess(true);
- return result;
- }
-
- @Override
- public void onError(Exception e, ErrorCode code, ErrorType type)
- {
- _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/handling/GroupMessageHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/GroupMessageHandler.java b/helix-core/src/main/java/com/linkedin/helix/messaging/handling/GroupMessageHandler.java
deleted file mode 100644
index 4b2dfcd..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/GroupMessageHandler.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package com.linkedin.helix.messaging.handling;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.Attributes;
-
-public class GroupMessageHandler
-{
- class CurrentStateUpdate
- {
- final PropertyKey _key;
- final CurrentState _curStateDelta;
-
- public CurrentStateUpdate(PropertyKey key, CurrentState curStateDelta)
- {
- _key = key;
- _curStateDelta = curStateDelta;
- }
-
- public void merge(CurrentState curState)
- {
- _curStateDelta.getRecord().merge(curState.getRecord());
- }
- }
-
- static class GroupMessageInfo
- {
- final Message _message;
- final AtomicInteger _countDown;
- final ConcurrentLinkedQueue<CurrentStateUpdate> _curStateUpdateList;
-
- public GroupMessageInfo(Message message)
- {
- _message = message;
- List<String> partitionNames = message.getPartitionNames();
- _countDown = new AtomicInteger(partitionNames.size());
- _curStateUpdateList = new ConcurrentLinkedQueue<CurrentStateUpdate>();
- }
-
- public Map<PropertyKey, CurrentState> merge()
- {
- Map<String, CurrentStateUpdate> curStateUpdateMap =
- new HashMap<String, CurrentStateUpdate>();
- for (CurrentStateUpdate update : _curStateUpdateList)
- {
- String path = update._key.getPath();
- if (!curStateUpdateMap.containsKey(path))
- {
- curStateUpdateMap.put(path, update);
- }
- else
- {
- curStateUpdateMap.get(path).merge(update._curStateDelta);
- }
- }
-
- Map<PropertyKey, CurrentState> ret = new HashMap<PropertyKey, CurrentState>();
- for (CurrentStateUpdate update : curStateUpdateMap.values())
- {
- ret.put(update._key, update._curStateDelta);
- }
-
- return ret;
- }
-
- }
-
- final ConcurrentHashMap<String, GroupMessageInfo> _groupMsgMap;
-
- public GroupMessageHandler()
- {
- _groupMsgMap = new ConcurrentHashMap<String, GroupMessageInfo>();
- }
-
- public void put(Message message)
- {
- _groupMsgMap.putIfAbsent(message.getId(), new GroupMessageInfo(message));
- }
-
- // return non-null if all sub-messages are completed
- public GroupMessageInfo onCompleteSubMessage(Message subMessage)
- {
- String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
- GroupMessageInfo info = _groupMsgMap.get(parentMid);
- if (info != null)
- {
- int val = info._countDown.decrementAndGet();
- if (val <= 0)
- {
- return _groupMsgMap.remove(parentMid);
- }
- }
-
- return null;
- }
-
- void addCurStateUpdate(Message subMessage, PropertyKey key, CurrentState delta)
- {
- String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
- GroupMessageInfo info = _groupMsgMap.get(parentMid);
- if (info != null)
- {
- info._curStateUpdateList.add(new CurrentStateUpdate(key, delta));
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixStateTransitionHandler.java
deleted file mode 100644
index 0df7e16..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixStateTransitionHandler.java
+++ /dev/null
@@ -1,388 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging.handling;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecordBucketizer;
-import com.linkedin.helix.ZNRecordDelta;
-import com.linkedin.helix.ZNRecordDelta.MergeOperation;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.participant.statemachine.StateModel;
-import com.linkedin.helix.participant.statemachine.StateModelParser;
-import com.linkedin.helix.participant.statemachine.StateTransitionError;
-import com.linkedin.helix.util.StatusUpdateUtil;
-
-public class HelixStateTransitionHandler extends MessageHandler
-{
- public static class HelixStateMismatchException extends Exception
- {
- public HelixStateMismatchException(String info)
- {
- super(info);
- }
- }
- private static Logger logger =
- Logger.getLogger(HelixStateTransitionHandler.class);
- private final StateModel _stateModel;
- StatusUpdateUtil _statusUpdateUtil;
- private final StateModelParser _transitionMethodFinder;
- private final CurrentState _currentStateDelta;
- volatile boolean _isTimeout = false;
- private final HelixTaskExecutor _executor;
-
- public HelixStateTransitionHandler(StateModel stateModel,
- Message message,
- NotificationContext context,
- CurrentState currentStateDelta,
- HelixTaskExecutor executor)
- {
- super(message, context);
- _stateModel = stateModel;
- _statusUpdateUtil = new StatusUpdateUtil();
- _transitionMethodFinder = new StateModelParser();
- _currentStateDelta = currentStateDelta;
- _executor = executor;
- }
-
- private void prepareMessageExecution(HelixManager manager, Message message) throws HelixException,
- HelixStateMismatchException
- {
- if (!message.isValid())
- {
- String errorMessage =
- "Invalid Message, ensure that message: " + message
- + " has all the required fields: "
- + Arrays.toString(Message.Attributes.values());
-
- _statusUpdateUtil.logError(message,
- HelixStateTransitionHandler.class,
- errorMessage,
- manager.getHelixDataAccessor());
- logger.error(errorMessage);
- throw new HelixException(errorMessage);
- }
- // DataAccessor accessor = manager.getDataAccessor();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
- String partitionName = message.getPartitionName();
- String fromState = message.getFromState();
-
- // Verify the fromState and current state of the stateModel
- String state = _currentStateDelta.getState(partitionName);
-
- if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state))
- {
- String errorMessage =
- "Current state of stateModel does not match the fromState in Message"
- + ", Current State:" + state + ", message expected:" + fromState
- + ", partition: " + partitionName + ", from: " + message.getMsgSrc()
- + ", to: " + message.getTgtName();
-
- _statusUpdateUtil.logError(message,
- HelixStateTransitionHandler.class,
- errorMessage,
- accessor);
- logger.error(errorMessage);
- throw new HelixStateMismatchException(errorMessage);
- }
- }
-
- void postExecutionMessage(HelixManager manager,
- Message message,
- NotificationContext context,
- HelixTaskResult taskResult,
- Exception exception)
- {
- String partitionKey = message.getPartitionName();
- String resource = message.getResourceName();
- String sessionId = message.getTgtSessionId();
- String instanceName = manager.getInstanceName();
-
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- int bucketSize = message.getBucketSize();
- ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
-
- // Lock the helix manager so that the session id will not change when we update
- // the state model state. for zk current state it is OK as we have the per-session
- // current state node
- synchronized (manager)
- {
- if (!message.getTgtSessionId().equals(manager.getSessionId()))
- {
- logger.warn("Session id has changed. Skip postExecutionMessage. Old session "
- + message.getExecutionSessionId() + " , new session : "
- + manager.getSessionId());
- return;
- }
-
- if (taskResult.isSucess())
- {
- // String fromState = message.getFromState();
- String toState = message.getToState();
- _currentStateDelta.setState(partitionKey, toState);
-
- if (toState.equalsIgnoreCase("DROPPED"))
- {
- // for "OnOfflineToDROPPED" message, we need to remove the resource key record
- // from the current state of the instance because the resource key is dropped.
- // In the state model it will be stayed as "OFFLINE", which is OK.
- ZNRecordDelta delta =
- new ZNRecordDelta(_currentStateDelta.getRecord(), MergeOperation.SUBTRACT);
- // Don't subtract simple fields since they contain stateModelDefRef
- delta._record.getSimpleFields().clear();
-
- List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
- deltaList.add(delta);
- _currentStateDelta.setDeltaList(deltaList);
- }
- else
- {
- // if the partition is not to be dropped, update _stateModel to the TO_STATE
- _stateModel.updateState(toState);
- }
- }
- else
- {
- if (exception instanceof HelixStateMismatchException)
- {
- // if fromState mismatch, set current state on zk to stateModel's current state
- logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. partitionKey: "
- + partitionKey
- + ", currentState: "
- + _stateModel.getCurrentState()
- + ", message: " + message);
- _currentStateDelta.setState(partitionKey, _stateModel.getCurrentState());
- }
- else
- {
- StateTransitionError error =
- new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, exception);
- if (exception instanceof InterruptedException)
- {
- if (_isTimeout)
- {
- error =
- new StateTransitionError(ErrorType.INTERNAL,
- ErrorCode.TIMEOUT,
- exception);
- }
- else
- {
- // State transition interrupted but not caused by timeout. Keep the current
- // state in this case
- logger.error("State transition interrupted but not timeout. Not updating state. Partition : "
- + message.getPartitionName() + " MsgId : " + message.getMsgId());
- return;
- }
- }
- _stateModel.rollbackOnError(message, context, error);
- _currentStateDelta.setState(partitionKey, "ERROR");
- _stateModel.updateState("ERROR");
- }
- }
- }
- try
- {
- // Update the ZK current state of the node
- PropertyKey key = keyBuilder.currentState(instanceName,
- sessionId,
- resource,
- bucketizer.getBucketName(partitionKey));
- if (!_message.getGroupMessageMode())
- {
- accessor.updateProperty(key, _currentStateDelta);
- }
- else
- {
- _executor._groupMsgHandler.addCurStateUpdate(_message, key, _currentStateDelta);
- }
- }
- catch (Exception e)
- {
- logger.error("Error when updating the state ", e);
- StateTransitionError error =
- new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
- _stateModel.rollbackOnError(message, context, error);
- _statusUpdateUtil.logError(message,
- HelixStateTransitionHandler.class,
- e,
- "Error when update the state ",
- accessor);
- }
- }
-
- public HelixTaskResult handleMessageInternal(Message message,
- NotificationContext context)
- {
- synchronized (_stateModel)
- {
- HelixTaskResult taskResult = new HelixTaskResult();
- HelixManager manager = context.getManager();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
- _statusUpdateUtil.logInfo(message,
- HelixStateTransitionHandler.class,
- "Message handling task begin execute",
- accessor);
- message.setExecuteStartTimeStamp(new Date().getTime());
-
- Exception exception = null;
- try
- {
- prepareMessageExecution(manager, message);
- invoke(accessor, context, taskResult, message);
- }
- catch (HelixStateMismatchException e)
- {
- // Simply log error and return from here if State mismatch.
- // The current state of the state model is intact.
- taskResult.setSuccess(false);
- taskResult.setMessage(e.toString());
- taskResult.setException(e);
- exception = e;
- // return taskResult;
- }
- catch (Exception e)
- {
- String errorMessage =
- "Exception while executing a state transition task "
- + message.getPartitionName();
- logger.error(errorMessage, e);
- if (e.getCause() != null && e.getCause() instanceof InterruptedException)
- {
- e = (InterruptedException) e.getCause();
- }
- _statusUpdateUtil.logError(message,
- HelixStateTransitionHandler.class,
- e,
- errorMessage,
- accessor);
- taskResult.setSuccess(false);
- taskResult.setMessage(e.toString());
- taskResult.setException(e);
- taskResult.setInterrupted(e instanceof InterruptedException);
- exception = e;
- }
- postExecutionMessage(manager, message, context, taskResult, exception);
-
- return taskResult;
- }
- }
-
- private void invoke(HelixDataAccessor accessor,
- NotificationContext context,
- HelixTaskResult taskResult,
- Message message) throws IllegalAccessException,
- InvocationTargetException,
- InterruptedException
- {
- _statusUpdateUtil.logInfo(message,
- HelixStateTransitionHandler.class,
- "Message handling invoking",
- accessor);
-
- // by default, we invoke state transition function in state model
- Method methodToInvoke = null;
- String fromState = message.getFromState();
- String toState = message.getToState();
- methodToInvoke =
- _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(),
- fromState,
- toState,
- new Class[] { Message.class,
- NotificationContext.class });
- if (methodToInvoke != null)
- {
- methodToInvoke.invoke(_stateModel, new Object[] { message, context });
- taskResult.setSuccess(true);
- }
- else
- {
- String errorMessage =
- "Unable to find method for transition from " + fromState + " to " + toState
- + "in " + _stateModel.getClass();
- logger.error(errorMessage);
- taskResult.setSuccess(false);
-
- _statusUpdateUtil.logError(message,
- HelixStateTransitionHandler.class,
- errorMessage,
- accessor);
- }
- }
-
- @Override
- public HelixTaskResult handleMessage()
- {
- return handleMessageInternal(_message, _notificationContext);
- }
-
- @Override
- public void onError(Exception e, ErrorCode code, ErrorType type)
- {
- // All internal error has been processed already, so we can skip them
- if (type == ErrorType.INTERNAL)
- {
- logger.error("Skip internal error " + e.getMessage() + " " + code);
- return;
- }
- HelixManager manager = _notificationContext.getManager();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- String instanceName = manager.getInstanceName();
- String partition = _message.getPartitionName();
- String resourceName = _message.getResourceName();
- CurrentState currentStateDelta = new CurrentState(resourceName);
-
- StateTransitionError error = new StateTransitionError(type, code, e);
- _stateModel.rollbackOnError(_message, _notificationContext, error);
- // if the transition is not canceled, it should go into error state
- if (code == ErrorCode.ERROR)
- {
- currentStateDelta.setState(partition, "ERROR");
- _stateModel.updateState("ERROR");
-
- accessor.updateProperty(keyBuilder.currentState(instanceName,
- _message.getTgtSessionId(),
- resourceName),
- currentStateDelta);
- }
- }
-
- @Override
- public void onTimeout()
- {
- _isTimeout = true;
- }
-};
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTask.java
deleted file mode 100644
index 73c622c..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTask.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging.handling;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.InstanceType;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.messaging.handling.GroupMessageHandler.GroupMessageInfo;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorCode;
-import com.linkedin.helix.messaging.handling.MessageHandler.ErrorType;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.monitoring.StateTransitionContext;
-import com.linkedin.helix.monitoring.StateTransitionDataPoint;
-import com.linkedin.helix.util.StatusUpdateUtil;
-
-public class HelixTask implements Callable<HelixTaskResult>
-{
- private static Logger logger = Logger.getLogger(HelixTask.class);
- private final Message _message;
- private final MessageHandler _handler;
- private final NotificationContext _notificationContext;
- private final HelixManager _manager;
- StatusUpdateUtil _statusUpdateUtil;
- HelixTaskExecutor _executor;
- volatile boolean _isTimeout = false;
-
- public class TimeoutCancelTask extends TimerTask
- {
- HelixTaskExecutor _executor;
- Message _message;
- NotificationContext _context;
-
- public TimeoutCancelTask(HelixTaskExecutor executor,
- Message message,
- NotificationContext context)
- {
- _executor = executor;
- _message = message;
- _context = context;
- }
-
- @Override
- public void run()
- {
- _isTimeout = true;
- logger.warn("Message time out, canceling. id:" + _message.getMsgId()
- + " timeout : " + _message.getExecutionTimeout());
- _handler.onTimeout();
- _executor.cancelTask(_message, _context);
- }
-
- }
-
- public HelixTask(Message message,
- NotificationContext notificationContext,
- MessageHandler handler,
- HelixTaskExecutor executor) throws Exception
- {
- this._notificationContext = notificationContext;
- this._message = message;
- this._handler = handler;
- this._manager = notificationContext.getManager();
- _statusUpdateUtil = new StatusUpdateUtil();
- _executor = executor;
- }
-
- @Override
- public HelixTaskResult call()
- {
- // Start the timeout TimerTask, if necessary
- Timer timer = null;
- if (_message.getExecutionTimeout() > 0)
- {
- timer = new Timer(true);
- timer.schedule(new TimeoutCancelTask(_executor, _message, _notificationContext),
- _message.getExecutionTimeout());
- logger.info("Message starts with timeout " + _message.getExecutionTimeout()
- + " MsgId:" + _message.getMsgId());
- }
- else
- {
- logger.info("Message does not have timeout. MsgId:" + _message.getMsgId() + "/"
- + _message.getPartitionName());
- }
-
- HelixTaskResult taskResult = new HelixTaskResult();
-
- Exception exception = null;
- ErrorType type = ErrorType.INTERNAL;
- ErrorCode code = ErrorCode.ERROR;
-
- long start = System.currentTimeMillis();
- logger.info("msg:" + _message.getMsgId() + " handling task begin, at: " + start);
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- _statusUpdateUtil.logInfo(_message,
- HelixTask.class,
- "Message handling task begin execute",
- accessor);
- _message.setExecuteStartTimeStamp(new Date().getTime());
-
- // Handle the message
- try
- {
- taskResult = _handler.handleMessage();
- exception = taskResult.getException();
- }
- catch (InterruptedException e)
- {
- _statusUpdateUtil.logError(_message,
- HelixTask.class,
- e,
- "State transition interrupted, timeout:" + _isTimeout,
- accessor);
- logger.info("Message " + _message.getMsgId() + " is interrupted");
- taskResult.setInterrupted(true);
- taskResult.setException(e);
- exception = e;
- }
- catch (Exception e)
- {
- String errorMessage =
- "Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
- + " type: " + _message.getMsgType();
- logger.error(errorMessage, e);
- _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
- taskResult.setSuccess(false);
- taskResult.setException(e);
- taskResult.setMessage(e.getMessage());
- exception = e;
- }
-
- // Cancel the timer since the handling is done
- // it is fine if the TimerTask for canceling is called already
- if (timer != null)
- {
- timer.cancel();
- }
-
- if (taskResult.isSucess())
- {
- _statusUpdateUtil.logInfo(_message,
- _handler.getClass(),
- "Message handling task completed successfully",
- accessor);
- logger.info("Message " + _message.getMsgId() + " completed.");
- }
- else if (taskResult.isInterrupted())
- {
- logger.info("Message " + _message.getMsgId() + " is interrupted");
- code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
- if (_isTimeout)
- {
- int retryCount = _message.getRetryCount();
- logger.info("Message timeout, retry count: " + retryCount + " MSGID:"
- + _message.getMsgId());
- _statusUpdateUtil.logInfo(_message,
- _handler.getClass(),
- "Message handling task timeout, retryCount:"
- + retryCount,
- accessor);
- // Notify the handler that timeout happens, and the number of retries left
- // In case timeout happens (time out and also interrupted)
- // we should retry the execution of the message by re-schedule it in
- if (retryCount > 0)
- {
- _message.setRetryCount(retryCount - 1);
- _executor.scheduleTask(_message, _handler, _notificationContext);
- return taskResult;
- }
- }
- }
- else
- // logging for errors
- {
- String errorMsg =
- "Message execution failed. msgId: " + _message.getMsgId()
- + taskResult.getMessage();
- if (exception != null)
- {
- errorMsg += exception;
- }
- logger.error(errorMsg, exception);
- _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
- }
-
- // Post-processing for the finished task
- try
- {
- if (!_message.getGroupMessageMode())
- {
- removeMessageFromZk(accessor, _message);
- reportMessageStat(_manager, _message, taskResult);
- sendReply(accessor, _message, taskResult);
- }
- else
- {
- GroupMessageInfo info = _executor._groupMsgHandler.onCompleteSubMessage(_message);
- if (info != null)
- {
- // TODO: changed to async update
- // group update current state
- Map<PropertyKey, CurrentState> curStateMap = info.merge();
- for (PropertyKey key : curStateMap.keySet())
- {
- accessor.updateProperty(key, curStateMap.get(key));
- }
-
- // remove group message
- removeMessageFromZk(accessor, _message);
- reportMessageStat(_manager, _message, taskResult);
- sendReply(accessor, _message, taskResult);
- }
- }
- _executor.reportCompletion(_message);
- }
-
- // TODO: capture errors and log here
- catch (Exception e)
- {
- String errorMessage =
- "Exception after executing a message, msgId: " + _message.getMsgId() + e;
- logger.error(errorMessage, e);
- _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
- exception = e;
- type = ErrorType.FRAMEWORK;
- code = ErrorCode.ERROR;
- }
- //
- finally
- {
- long end = System.currentTimeMillis();
- logger.info("msg:" + _message.getMsgId() + " handling task completed, results:"
- + taskResult.isSucess() + ", at: " + end + ", took:" + (end - start));
-
- // Notify the handler about any error happened in the handling procedure, so that
- // the handler have chance to finally cleanup
- if (exception != null)
- {
- _handler.onError(exception, code, type);
- }
- }
- return taskResult;
- }
-
- private void removeMessageFromZk(HelixDataAccessor accessor, Message message)
- {
- Builder keyBuilder = accessor.keyBuilder();
- if (message.getTgtName().equalsIgnoreCase("controller"))
- {
- // TODO: removeProperty returns boolean
- accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId()));
- }
- else
- {
- accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(),
- message.getMsgId()));
- }
- }
-
- private void sendReply(HelixDataAccessor accessor,
- Message message,
- HelixTaskResult taskResult)
- {
- if (_message.getCorrelationId() != null
- && !message.getMsgType().equals(MessageType.TASK_REPLY.toString()))
- {
- logger.info("Sending reply for message " + message.getCorrelationId());
- _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", accessor);
-
- taskResult.getTaskResultMap().put("SUCCESS", "" + taskResult.isSucess());
- taskResult.getTaskResultMap().put("INTERRUPTED", "" + taskResult.isInterrupted());
- if (!taskResult.isSucess())
- {
- taskResult.getTaskResultMap().put("ERRORINFO", taskResult.getMessage());
- }
- Message replyMessage =
- Message.createReplyMessage(_message,
- _manager.getInstanceName(),
- taskResult.getTaskResultMap());
- replyMessage.setSrcInstanceType(_manager.getInstanceType());
-
- if (message.getSrcInstanceType() == InstanceType.PARTICIPANT)
- {
- Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.message(message.getMsgSrc(),
- replyMessage.getMsgId()),
- replyMessage);
- }
- else if (message.getSrcInstanceType() == InstanceType.CONTROLLER)
- {
- Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()),
- replyMessage);
- }
- _statusUpdateUtil.logInfo(message, HelixTask.class, "1 msg replied to "
- + replyMessage.getTgtName(), accessor);
- }
- }
-
- private void reportMessageStat(HelixManager manager,
- Message message,
- HelixTaskResult taskResult)
- {
- // report stat
- if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
- {
- return;
- }
- long now = new Date().getTime();
- long msgReadTime = message.getReadTimeStamp();
- long msgExecutionStartTime = message.getExecuteStartTimeStamp();
- if (msgReadTime != 0 && msgExecutionStartTime != 0)
- {
- long totalDelay = now - msgReadTime;
- long executionDelay = now - msgExecutionStartTime;
- if (totalDelay > 0 && executionDelay > 0)
- {
- String fromState = message.getFromState();
- String toState = message.getToState();
- String transition = fromState + "--" + toState;
-
- StateTransitionContext cxt =
- new StateTransitionContext(manager.getClusterName(),
- manager.getInstanceName(),
- message.getResourceName(),
- transition);
-
- StateTransitionDataPoint data =
- new StateTransitionDataPoint(totalDelay,
- executionDelay,
- taskResult.isSucess());
- _executor.getParticipantMonitor().reportTransitionStat(cxt, data);
- }
- }
- else
- {
- logger.warn("message read time and start execution time not recorded.");
- }
- }
-
-};
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/3cb7a1c9/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTaskExecutor.java
deleted file mode 100644
index 84fa329..0000000
--- a/helix-core/src/main/java/com/linkedin/helix/messaging/handling/HelixTaskExecutor.java
+++ /dev/null
@@ -1,638 +0,0 @@
-/**
- * Copyright (C) 2012 LinkedIn Inc <op...@linkedin.com>
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.helix.messaging.handling;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.log4j.Logger;
-
-import com.linkedin.helix.ConfigAccessor;
-import com.linkedin.helix.ConfigScope;
-import com.linkedin.helix.ConfigScopeBuilder;
-import com.linkedin.helix.HelixConstants;
-import com.linkedin.helix.HelixDataAccessor;
-import com.linkedin.helix.HelixException;
-import com.linkedin.helix.HelixManager;
-import com.linkedin.helix.MessageListener;
-import com.linkedin.helix.NotificationContext;
-import com.linkedin.helix.NotificationContext.Type;
-import com.linkedin.helix.PropertyKey;
-import com.linkedin.helix.PropertyKey.Builder;
-import com.linkedin.helix.ZNRecord;
-import com.linkedin.helix.model.CurrentState;
-import com.linkedin.helix.model.Message;
-import com.linkedin.helix.model.Message.Attributes;
-import com.linkedin.helix.model.Message.MessageState;
-import com.linkedin.helix.model.Message.MessageType;
-import com.linkedin.helix.monitoring.ParticipantMonitor;
-import com.linkedin.helix.participant.HelixStateMachineEngine;
-import com.linkedin.helix.util.StatusUpdateUtil;
-
-public class HelixTaskExecutor implements MessageListener
-{
- // TODO: we need to further design how to throttle this.
- // From storage point of view, only bootstrap case is expensive
- // and we need to throttle, which is mostly IO / network bounded.
- public static final int DEFAULT_PARALLEL_TASKS = 40;
- // TODO: create per-task type threadpool with customizable pool size
- protected final Map<String, Future<HelixTaskResult>> _taskMap;
- private final Object _lock;
- private final StatusUpdateUtil _statusUpdateUtil;
- private final ParticipantMonitor _monitor;
- public static final String MAX_THREADS =
- "maxThreads";
-
- final ConcurrentHashMap<String, MessageHandlerFactory> _handlerFactoryMap =
- new ConcurrentHashMap<String, MessageHandlerFactory>();
-
- final ConcurrentHashMap<String, ExecutorService> _threadpoolMap =
- new ConcurrentHashMap<String, ExecutorService>();
-
- private static Logger LOG =
- Logger.getLogger(HelixTaskExecutor.class);
-
- Map<String, Integer> _resourceThreadpoolSizeMap =
- new ConcurrentHashMap<String, Integer>();
-
- final GroupMessageHandler _groupMsgHandler;
-
- public HelixTaskExecutor()
- {
- _taskMap = new ConcurrentHashMap<String, Future<HelixTaskResult>>();
- _groupMsgHandler = new GroupMessageHandler();
-
- _lock = new Object();
- _statusUpdateUtil = new StatusUpdateUtil();
- _monitor = new ParticipantMonitor();
- startMonitorThread();
- }
-
- public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
- {
- registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS);
- }
-
- public void registerMessageHandlerFactory(String type,
- MessageHandlerFactory factory,
- int threadpoolSize)
- {
- if (!_handlerFactoryMap.containsKey(type))
- {
- if (!type.equalsIgnoreCase(factory.getMessageType()))
- {
- throw new HelixException("Message factory type mismatch. Type: " + type
- + " factory : " + factory.getMessageType());
-
- }
- _handlerFactoryMap.put(type, factory);
- _threadpoolMap.put(type, Executors.newFixedThreadPool(threadpoolSize));
- LOG.info("Adding msg factory for type " + type + " threadpool size "
- + threadpoolSize);
- }
- else
- {
- LOG.error("Ignoring duplicate msg handler factory for type " + type);
- }
- }
-
- public ParticipantMonitor getParticipantMonitor()
- {
- return _monitor;
- }
-
- private void startMonitorThread()
- {
- // start a thread which monitors the completions of task
- }
-
- void checkResourceConfig(String resourceName, HelixManager manager)
- {
- if (!_resourceThreadpoolSizeMap.containsKey(resourceName))
- {
- int threadpoolSize = -1;
- ConfigAccessor configAccessor = manager.getConfigAccessor();
- if (configAccessor != null)
- {
- ConfigScope scope =
- new ConfigScopeBuilder().forCluster(manager.getClusterName())
- .forResource(resourceName)
- .build();
-
- String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
- try
- {
- if (threadpoolSizeStr != null)
- {
- threadpoolSize = Integer.parseInt(threadpoolSizeStr);
- }
- }
- catch (Exception e)
- {
- LOG.error("", e);
- }
- }
- if (threadpoolSize > 0)
- {
- String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName;
- _threadpoolMap.put(key, Executors.newFixedThreadPool(threadpoolSize));
- LOG.info("Adding per resource threadpool for resource " + resourceName
- + " with size " + threadpoolSize);
- }
- _resourceThreadpoolSizeMap.put(resourceName, threadpoolSize);
- }
- }
-
- /**
- * Find the executor service for the message. A message can have a per-statemodelfactory
- * executor service, or per-message type executor service.
- *
- **/
- ExecutorService findExecutorServiceForMsg(Message message)
- {
- ExecutorService executorService = _threadpoolMap.get(message.getMsgType());
- if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
- {
- String resourceName = message.getResourceName();
- if (resourceName != null)
- {
- String key = message.getMsgType() + "." + resourceName;
- if (_threadpoolMap.containsKey(key))
- {
- LOG.info("Find per-resource thread pool with key " + key);
- executorService = _threadpoolMap.get(key);
- }
- }
- }
- return executorService;
- }
-
- public void scheduleTask(Message message,
- MessageHandler handler,
- NotificationContext notificationContext)
- {
- assert (handler != null);
- synchronized (_lock)
- {
- try
- {
- String taskId = message.getMsgId() + "/" + message.getPartitionName();
-
- if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
- {
- checkResourceConfig(message.getResourceName(), notificationContext.getManager());
- }
- LOG.info("Scheduling message: " + taskId);
- // System.out.println("sched msg: " + message.getPartitionName() + "-"
- // + message.getTgtName() + "-" + message.getFromState() + "-"
- // + message.getToState());
-
- _statusUpdateUtil.logInfo(message,
- HelixTaskExecutor.class,
- "Message handling task scheduled",
- notificationContext.getManager().getHelixDataAccessor());
-
- HelixTask task = new HelixTask(message, notificationContext, handler, this);
- if (!_taskMap.containsKey(taskId))
- {
- LOG.info("Message:" + taskId + " handling task scheduled");
- Future<HelixTaskResult> future =
- findExecutorServiceForMsg(message).submit(task);
- _taskMap.put(taskId, future);
- }
- else
- {
- _statusUpdateUtil.logWarning(message,
- HelixTaskExecutor.class,
- "Message handling task already sheduled for "
- + taskId,
- notificationContext.getManager()
- .getHelixDataAccessor());
- }
- }
- catch (Exception e)
- {
- LOG.error("Error while executing task." + message, e);
-
- _statusUpdateUtil.logError(message,
- HelixTaskExecutor.class,
- e,
- "Error while executing task " + e,
- notificationContext.getManager()
- .getHelixDataAccessor());
- }
- }
- }
-
- public void cancelTask(Message message, NotificationContext notificationContext)
- {
- synchronized (_lock)
- {
- String taskId = message.getMsgId() + "/" + message.getPartitionName();
-
- if (_taskMap.containsKey(taskId))
- {
- _statusUpdateUtil.logInfo(message,
- HelixTaskExecutor.class,
- "Trying to cancel the future for " + taskId,
- notificationContext.getManager().getHelixDataAccessor());
- Future<HelixTaskResult> future = _taskMap.get(taskId);
-
- // If the thread is still running it will be interrupted if cancel(true)
- // is called. So state transition callbacks should implement logic to
- // return
- // if it is interrupted.
- if (future.cancel(true))
- {
- _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled "
- + taskId, notificationContext.getManager().getHelixDataAccessor());
- _taskMap.remove(taskId);
- }
- else
- {
- _statusUpdateUtil.logInfo(message,
- HelixTaskExecutor.class,
- "false when trying to cancel the message " + taskId,
- notificationContext.getManager()
- .getHelixDataAccessor());
- }
- }
- else
- {
- _statusUpdateUtil.logWarning(message,
- HelixTaskExecutor.class,
- "Future not found when trying to cancel " + taskId,
- notificationContext.getManager()
- .getHelixDataAccessor());
- }
- }
- }
-
- protected void reportCompletion(Message message) // String msgId)
- {
- synchronized (_lock)
- {
- String taskId = message.getMsgId() + "/" + message.getPartitionName();
- LOG.info("message finished: " + taskId + ", took "
- + (new Date().getTime() - message.getExecuteStartTimeStamp()));
- if (_taskMap.containsKey(taskId))
- {
- _taskMap.remove(taskId);
- }
- else
- {
- LOG.warn("message " + taskId + "not found in task map");
- }
- }
- }
-
- private void updateMessageState(List<Message> readMsgs,
- HelixDataAccessor accessor,
- String instanceName)
- {
- Builder keyBuilder = accessor.keyBuilder();
- List<PropertyKey> readMsgKeys = new ArrayList<PropertyKey>();
- for (Message msg : readMsgs)
- {
- readMsgKeys.add(msg.getKey(keyBuilder, instanceName));
- }
- accessor.setChildren(readMsgKeys, readMsgs);
- }
-
- @Override
- public void onMessage(String instanceName,
- List<Message> messages,
- NotificationContext changeContext)
- {
- // If FINALIZE notification comes, reset all handler factories
- // and terminate all the thread pools
- // TODO: see if we should have a separate notification call for resetting
- if (changeContext.getType() == Type.FINALIZE)
- {
- LOG.info("Get FINALIZE notification");
- for (MessageHandlerFactory factory : _handlerFactoryMap.values())
- {
- factory.reset();
- }
- // Cancel all scheduled future
- // synchronized (_lock)
- {
- for (Future<HelixTaskResult> f : _taskMap.values())
- {
- f.cancel(true);
- }
- _taskMap.clear();
- }
- return;
- }
-
- HelixManager manager = changeContext.getManager();
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
- Builder keyBuilder = accessor.keyBuilder();
-
- if (messages == null || messages.size() == 0)
- {
- LOG.info("No Messages to process");
- return;
- }
-
- // sort message by creation timestamp, so message created earlier is processed first
- Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
-
- // message handlers created
- List<MessageHandler> handlers = new ArrayList<MessageHandler>();
-
- // message read
- List<Message> readMsgs = new ArrayList<Message>();
-
- String sessionId = manager.getSessionId();
- List<String> curResourceNames =
- accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
- List<PropertyKey> createCurStateKeys = new ArrayList<PropertyKey>();
- List<CurrentState> metaCurStates = new ArrayList<CurrentState>();
- Set<String> createCurStateNames = new HashSet<String>();
-
- changeContext.add(NotificationContext.TASK_EXECUTOR_KEY, this);
- for (Message message : messages)
- {
- // nop messages are simply removed. It is used to trigger onMessage() in
- // situations such as register a new message handler factory
- if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString()))
- {
- LOG.info("Dropping NO-OP message. mid: " + message.getId() + ", from: "
- + message.getMsgSrc());
- accessor.removeProperty(message.getKey(keyBuilder, instanceName));
- continue;
- }
-
- String tgtSessionId = message.getTgtSessionId();
-
- // if sessionId not match, remove it
- if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*"))
- {
- String warningMessage =
- "SessionId does NOT match. expected sessionId: " + sessionId
- + ", tgtSessionId in message: " + tgtSessionId + ", messageId: "
- + message.getMsgId();
- LOG.warn(warningMessage);
- accessor.removeProperty(message.getKey(keyBuilder, instanceName));
- _statusUpdateUtil.logWarning(message,
- HelixStateMachineEngine.class,
- warningMessage,
- accessor);
- continue;
- }
-
- // don't process message that is of READ or UNPROCESSABLE state
- if (MessageState.NEW != message.getMsgState())
- {
- // It happens because we don't delete message right after
- // read. Instead we keep it until the current state is updated.
- // We will read the message again if there is a new message but we
- // check for the status and ignore if its already read
- LOG.trace("Message already read. mid: " + message.getMsgId());
- continue;
- }
-
- // create message handlers, if handlers not found, leave its state as NEW
- try
- {
- List<MessageHandler> createHandlers =
- createMessageHandlers(message, changeContext);
- if (createHandlers.isEmpty())
- {
- continue;
- }
- handlers.addAll(createHandlers);
- }
- catch (Exception e)
- {
- LOG.error("Failed to create message handler for " + message.getMsgId(), e);
- String error =
- "Failed to create message handler for " + message.getMsgId()
- + ", exception: " + e;
-
- _statusUpdateUtil.logError(message,
- HelixStateMachineEngine.class,
- e,
- error,
- accessor);
-
- // Mark message state UNPROCESSABLE if we hit an exception in creating
- // message handler. The message will stay on zookeeper but will not be processed
- message.setMsgState(MessageState.UNPROCESSABLE);
- accessor.updateProperty(message.getKey(keyBuilder, instanceName), message);
- continue;
- }
-
- // update msgState to read
- message.setMsgState(MessageState.READ);
- message.setReadTimeStamp(new Date().getTime());
- message.setExecuteSessionId(changeContext.getManager().getSessionId());
-
- _statusUpdateUtil.logInfo(message,
- HelixStateMachineEngine.class,
- "New Message",
- accessor);
-
- readMsgs.add(message);
-
- // batch creation of all current state meta data
- // do it for non-controller and state transition messages only
- if (!message.isControlerMsg()
- && message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString()))
- {
- String resourceName = message.getResourceName();
- if (!curResourceNames.contains(resourceName)
- && !createCurStateNames.contains(resourceName))
- {
- createCurStateNames.add(resourceName);
- createCurStateKeys.add(keyBuilder.currentState(instanceName,
- sessionId,
- resourceName));
-
- CurrentState metaCurState = new CurrentState(resourceName);
- metaCurState.setBucketSize(message.getBucketSize());
- metaCurState.setStateModelDefRef(message.getStateModelDef());
- metaCurState.setSessionId(sessionId);
- metaCurState.setGroupMessageMode(message.getGroupMessageMode());
- String ftyName = message.getStateModelFactoryName();
- if (ftyName != null)
- {
- metaCurState.setStateModelFactoryName(ftyName);
- }
- else
- {
- metaCurState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
- }
-
- metaCurStates.add(metaCurState);
- }
- }
- }
-
- // batch create curState meta
- if (createCurStateKeys.size() > 0)
- {
- try
- {
- accessor.createChildren(createCurStateKeys, metaCurStates);
- }
- catch (Exception e)
- {
- LOG.error(e);
- }
- }
-
- // update message state to READ in batch and schedule all read messages
- if (readMsgs.size() > 0)
- {
- updateMessageState(readMsgs, accessor, instanceName);
-
- for (MessageHandler handler : handlers)
- {
- scheduleTask(handler._message, handler, changeContext);
- }
- }
- }
-
- private MessageHandler createMessageHandler(Message message,
- NotificationContext changeContext)
- {
- String msgType = message.getMsgType().toString();
-
- MessageHandlerFactory handlerFactory = _handlerFactoryMap.get(msgType);
-
- // Fail to find a MessageHandlerFactory for the message
- // we will keep the message and the message will be handled when
- // the corresponding MessageHandlerFactory is registered
- if (handlerFactory == null)
- {
- LOG.warn("Fail to find message handler factory for type: " + msgType + " mid:"
- + message.getMsgId());
- return null;
- }
-
- return handlerFactory.createHandler(message, changeContext);
- }
-
- private List<MessageHandler> createMessageHandlers(Message message,
- NotificationContext changeContext)
- {
- List<MessageHandler> handlers = new ArrayList<MessageHandler>();
- if (!message.getGroupMessageMode())
- {
- LOG.info("Creating handler for message " + message.getMsgId() + "/"
- + message.getPartitionName());
-
- MessageHandler handler = createMessageHandler(message, changeContext);
-
- if (handler != null)
- {
- handlers.add(handler);
- }
- }
- else
- {
- _groupMsgHandler.put(message);
-
- List<String> partitionNames = message.getPartitionNames();
- for (String partitionName : partitionNames)
- {
- Message subMsg = new Message(message.getRecord());
- subMsg.setPartitionName(partitionName);
- subMsg.setAttribute(Attributes.PARENT_MSG_ID, message.getId());
-
- LOG.info("Creating handler for group message " + subMsg.getMsgId() + "/"
- + partitionName);
- MessageHandler handler = createMessageHandler(subMsg, changeContext);
- if (handler != null)
- {
- handlers.add(handler);
- }
- }
- }
-
- return handlers;
- }
-
- public void shutDown()
- {
- LOG.info("shutting down TaskExecutor");
- synchronized (_lock)
- {
- for (String msgType : _threadpoolMap.keySet())
- {
- List<Runnable> tasksLeft = _threadpoolMap.get(msgType).shutdownNow();
- LOG.info(tasksLeft.size() + " tasks are still in the threadpool for msgType "
- + msgType);
- }
- for (String msgType : _threadpoolMap.keySet())
- {
- try
- {
- if (!_threadpoolMap.get(msgType).awaitTermination(200, TimeUnit.MILLISECONDS))
- {
- LOG.warn(msgType + " is not fully termimated in 200 MS");
- System.out.println(msgType + " is not fully termimated in 200 MS");
- }
- }
- catch (InterruptedException e)
- {
- LOG.error("Interrupted", e);
- }
- }
- }
- _monitor.shutDown();
- LOG.info("shutdown finished");
- }
-
- // TODO: remove this
- public static void main(String[] args) throws Exception
- {
- ExecutorService pool = Executors.newFixedThreadPool(DEFAULT_PARALLEL_TASKS);
- Future<HelixTaskResult> future;
- future = pool.submit(new Callable<HelixTaskResult>()
- {
-
- @Override
- public HelixTaskResult call() throws Exception
- {
- System.out.println("CMTaskExecutor.main(...).new Callable() {...}.call()");
- return null;
- }
-
- });
- future = pool.submit(new HelixTask(null, null, null, null));
- Thread.currentThread().join();
- System.out.println(future.isDone());
- }
-}