You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/05/09 19:59:52 UTC
git commit: [HELIX-443] Race condition in Helix register/unregister
MessageHandlerFactory, rb=21248
Repository: helix
Updated Branches:
refs/heads/helix-0.6.2-release 6a1b7b003 -> 7b5250a34
[HELIX-443] Race condition in Helix register/unregister MessageHandlerFactory, rb=21248
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7b5250a3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7b5250a3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7b5250a3
Branch: refs/heads/helix-0.6.2-release
Commit: 7b5250a34dfbc9317f990cf309d9aa32a9e4973b
Parents: 6a1b7b0
Author: zzhang <zz...@apache.org>
Authored: Fri May 9 10:59:34 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri May 9 10:59:34 2014 -0700
----------------------------------------------------------------------
.../messaging/handling/HelixTaskExecutor.java | 121 ++++++++++---------
1 file changed, 67 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7b5250a3/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 600a3ab..a227eb3 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -35,8 +35,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.helix.ConfigAccessor;
-import org.apache.helix.model.ConfigScope;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
@@ -48,6 +47,8 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
@@ -101,19 +102,22 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
@Override
public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
int threadpoolSize) {
- if (!_handlerFactoryMap.containsKey(type)) {
- if (!type.equalsIgnoreCase(factory.getMessageType())) {
- throw new HelixException("Message factory type mismatch. Type: " + type + " factory : "
- + factory.getMessageType());
+ if (!type.equalsIgnoreCase(factory.getMessageType())) {
+ throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: "
+ + factory.getMessageType());
+ }
+ MessageHandlerFactory prevFactory = _handlerFactoryMap.putIfAbsent(type, factory);
+ if (prevFactory == null) {
+ if (!_executorMap.contains(type)) {
+ _executorMap.put(type, Executors.newFixedThreadPool(threadpoolSize));
+ } else {
+ LOG.error("Skip to create new thread pool for type: " + type);
}
- _handlerFactoryMap.put(type, factory);
- ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize);
- _executorMap.put(type, executorSvc);
-
- LOG.info("Added msg-factory for type: " + type + ", threadpool size " + threadpoolSize);
+ LOG.info("Registered message handler factory for type: " + type + ", poolSize: "
+ + threadpoolSize + ", factory: " + factory + ", pool: " + _executorMap.get(type));
} else {
- LOG.warn("Fail to register msg-handler-factory for type: " + type + ", pool-size: "
+ LOG.warn("Fail to register message handler factory for type: " + type + ", poolSize: "
+ threadpoolSize + ", factory: " + factory);
}
}
@@ -131,9 +135,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
int threadpoolSize = -1;
ConfigAccessor configAccessor = manager.getConfigAccessor();
if (configAccessor != null) {
- ConfigScope scope =
- new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource(resourceName)
- .build();
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
+ .forCluster(manager.getClusterName()).forResource(resourceName).build();
String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
try {
@@ -240,6 +244,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
synchronized (_lock) {
if (!_taskMap.containsKey(taskId)) {
ExecutorService exeSvc = findExecutorServiceForMsg(message);
+
+ LOG.info("Submit task: " + taskId + " to pool: " + exeSvc);
Future<HelixTaskResult> future = exeSvc.submit(task);
TimerTask timerTask = null;
@@ -294,8 +300,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
// If the thread is still running it will be interrupted if cancel(true)
// is called. So state transition callbacks should implement logic to
- // return
- // if it is interrupted.
+ // return if it is interrupted.
+ LOG.info("Cancelling task: " + taskId);
if (future.cancel(true)) {
_statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled task: " + taskId,
notificationContext.getManager().getHelixDataAccessor());
@@ -344,44 +350,65 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
accessor.setChildren(readMsgKeys, readMsgs);
}
+ private void shutdownAndAwaitTermination(ExecutorService pool) {
+ LOG.info("Shutting down pool: " + pool);
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(200, TimeUnit.MICROSECONDS)) {
+ List<Runnable> waitingTasks = pool.shutdownNow(); // Cancel currently executing tasks
+ LOG.info("Tasks that never commenced execution: " + waitingTasks);
+
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(200, TimeUnit.MICROSECONDS)) {
+ LOG.error("Pool did not fully terminate in 200ms. pool: " + pool);
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ LOG.error("Interruped when waiting for shutdown pool: " + pool, ie);
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
/**
* remove message-handler factory from map, shutdown the associated executor
* @param type
*/
void unregisterMessageHandlerFactory(String type) {
- // shutdown executor-service. disconnect if fail
- ExecutorService executorSvc = _executorMap.remove(type);
- if (executorSvc != null) {
- List<Runnable> tasksLeft = executorSvc.shutdownNow();
- LOG.info(tasksLeft.size() + " tasks never executed for msgType: " + type + ". tasks: "
- + tasksLeft);
- try {
- if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
- LOG.error("executor-service for msgType: " + type
- + " is not fully terminated in 200ms. will disconnect helix-participant");
- throw new HelixException("fail to unregister msg-handler for msgType: " + type);
- }
- } catch (InterruptedException e) {
- LOG.error("interruped when waiting for executor-service shutdown for msgType: " + type, e);
- }
+ ExecutorService pool = _executorMap.remove(type);
+ MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
+
+ LOG.info("Unregistering message handler factory for type: " + type + ", factory: " + handlerFty
+ + ", pool: " + pool);
+
+ if (pool != null) {
+ shutdownAndAwaitTermination(pool);
}
// reset state-model
- MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
if (handlerFty != null) {
handlerFty.reset();
}
+
+ LOG.info("Unregistered message handler factory for type: " + type + ", factory: " + handlerFty
+ + ", pool: " + pool);
}
void reset() {
- LOG.info("Get FINALIZE notification");
+ LOG.info("Reset HelixTaskExecutor");
for (String msgType : _executorMap.keySet()) {
unregisterMessageHandlerFactory(msgType);
}
- // clear task-map, all tasks should be terminated by now
+ // Log all tasks that fail to terminate
+ for (String taskId : _taskMap.keySet()) {
+ MessageTaskInfo info = _taskMap.get(taskId);
+ LOG.warn("Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage());
+ }
_taskMap.clear();
-
}
@Override
@@ -555,26 +582,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
@Override
public void shutdown() {
- LOG.info("shutting down TaskExecutor");
+ LOG.info("Shutting down HelixTaskExecutor");
_timer.cancel();
- synchronized (_lock) {
- for (String msgType : _executorMap.keySet()) {
- List<Runnable> tasksLeft = _executorMap.get(msgType).shutdownNow();
- LOG.info(tasksLeft.size() + " tasks are still in the threadpool for msgType " + msgType);
- }
- for (String msgType : _executorMap.keySet()) {
- try {
- if (!_executorMap.get(msgType).awaitTermination(200, TimeUnit.MILLISECONDS)) {
- LOG.warn(msgType + " is not fully termimated in 200 MS");
- System.out.println(msgType + " is not fully termimated in 200 MS");
- }
- } catch (InterruptedException e) {
- LOG.error("Interrupted", e);
- }
- }
- }
+ reset();
+
_monitor.shutDown();
- LOG.info("shutdown finished");
+ LOG.info("Shutdown HelixTaskExecutor finished");
}
}