You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/05/09 19:59:52 UTC

git commit: [HELIX-443] Race condition in Helix register/unregister MessageHandlerFactory, rb=21248

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.2-release 6a1b7b003 -> 7b5250a34


[HELIX-443] Race condition in Helix register/unregister MessageHandlerFactory, rb=21248


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7b5250a3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7b5250a3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7b5250a3

Branch: refs/heads/helix-0.6.2-release
Commit: 7b5250a34dfbc9317f990cf309d9aa32a9e4973b
Parents: 6a1b7b0
Author: zzhang <zz...@apache.org>
Authored: Fri May 9 10:59:34 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Fri May 9 10:59:34 2014 -0700

----------------------------------------------------------------------
 .../messaging/handling/HelixTaskExecutor.java   | 121 ++++++++++---------
 1 file changed, 67 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7b5250a3/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 600a3ab..a227eb3 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -35,8 +35,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.ConfigAccessor;
-import org.apache.helix.model.ConfigScope;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -48,6 +47,8 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
@@ -101,19 +102,22 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   @Override
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
       int threadpoolSize) {
-    if (!_handlerFactoryMap.containsKey(type)) {
-      if (!type.equalsIgnoreCase(factory.getMessageType())) {
-        throw new HelixException("Message factory type mismatch. Type: " + type + " factory : "
-            + factory.getMessageType());
+    if (!type.equalsIgnoreCase(factory.getMessageType())) {
+      throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: "
+          + factory.getMessageType());
+    }
 
+    MessageHandlerFactory prevFactory = _handlerFactoryMap.putIfAbsent(type, factory);
+    if (prevFactory == null) {
+      if (!_executorMap.contains(type)) {
+        _executorMap.put(type, Executors.newFixedThreadPool(threadpoolSize));
+      } else {
+        LOG.error("Skip to create new thread pool for type: " + type);
       }
-      _handlerFactoryMap.put(type, factory);
-      ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize);
-      _executorMap.put(type, executorSvc);
-
-      LOG.info("Added msg-factory for type: " + type + ", threadpool size " + threadpoolSize);
+      LOG.info("Registered message handler factory for type: " + type + ", poolSize: "
+          + threadpoolSize + ", factory: " + factory + ", pool: " + _executorMap.get(type));
     } else {
-      LOG.warn("Fail to register msg-handler-factory for type: " + type + ", pool-size: "
+      LOG.warn("Fail to register message handler factory for type: " + type + ", poolSize: "
           + threadpoolSize + ", factory: " + factory);
     }
   }
@@ -131,9 +135,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       int threadpoolSize = -1;
       ConfigAccessor configAccessor = manager.getConfigAccessor();
       if (configAccessor != null) {
-        ConfigScope scope =
-            new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource(resourceName)
-                .build();
+        HelixConfigScope scope =
+            new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
+                .forCluster(manager.getClusterName()).forResource(resourceName).build();
 
         String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
         try {
@@ -240,6 +244,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       synchronized (_lock) {
         if (!_taskMap.containsKey(taskId)) {
           ExecutorService exeSvc = findExecutorServiceForMsg(message);
+
+          LOG.info("Submit task: " + taskId + " to pool: " + exeSvc);
           Future<HelixTaskResult> future = exeSvc.submit(task);
 
           TimerTask timerTask = null;
@@ -294,8 +300,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
         // If the thread is still running it will be interrupted if cancel(true)
         // is called. So state transition callbacks should implement logic to
-        // return
-        // if it is interrupted.
+        // return if it is interrupted.
+        LOG.info("Cancelling task: " + taskId);
         if (future.cancel(true)) {
           _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled task: " + taskId,
               notificationContext.getManager().getHelixDataAccessor());
@@ -344,44 +350,65 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     accessor.setChildren(readMsgKeys, readMsgs);
   }
 
+  private void shutdownAndAwaitTermination(ExecutorService pool) {
+    LOG.info("Shutting down pool: " + pool);
+    pool.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!pool.awaitTermination(200, TimeUnit.MICROSECONDS)) {
+        List<Runnable> waitingTasks = pool.shutdownNow(); // Cancel currently executing tasks
+        LOG.info("Tasks that never commenced execution: " + waitingTasks);
+
+        // Wait a while for tasks to respond to being cancelled
+        if (!pool.awaitTermination(200, TimeUnit.MICROSECONDS)) {
+          LOG.error("Pool did not fully terminate in 200ms. pool: " + pool);
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      LOG.error("Interruped when waiting for shutdown pool: " + pool, ie);
+      pool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
+
   /**
    * remove message-handler factory from map, shutdown the associated executor
    * @param type
    */
   void unregisterMessageHandlerFactory(String type) {
-    // shutdown executor-service. disconnect if fail
-    ExecutorService executorSvc = _executorMap.remove(type);
-    if (executorSvc != null) {
-      List<Runnable> tasksLeft = executorSvc.shutdownNow();
-      LOG.info(tasksLeft.size() + " tasks never executed for msgType: " + type + ". tasks: "
-          + tasksLeft);
-      try {
-        if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
-          LOG.error("executor-service for msgType: " + type
-              + " is not fully terminated in 200ms. will disconnect helix-participant");
-          throw new HelixException("fail to unregister msg-handler for msgType: " + type);
-        }
-      } catch (InterruptedException e) {
-        LOG.error("interruped when waiting for executor-service shutdown for msgType: " + type, e);
-      }
+    ExecutorService pool = _executorMap.remove(type);
+    MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
+
+    LOG.info("Unregistering message handler factory for type: " + type + ", factory: " + handlerFty
+        + ", pool: " + pool);
+
+    if (pool != null) {
+      shutdownAndAwaitTermination(pool);
     }
 
     // reset state-model
-    MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
     if (handlerFty != null) {
       handlerFty.reset();
     }
+
+    LOG.info("Unregistered message handler factory for type: " + type + ", factory: " + handlerFty
+        + ", pool: " + pool);
   }
 
   void reset() {
-    LOG.info("Get FINALIZE notification");
+    LOG.info("Reset HelixTaskExecutor");
     for (String msgType : _executorMap.keySet()) {
       unregisterMessageHandlerFactory(msgType);
     }
 
-    // clear task-map, all tasks should be terminated by now
+    // Log all tasks that fail to terminate
+    for (String taskId : _taskMap.keySet()) {
+      MessageTaskInfo info = _taskMap.get(taskId);
+      LOG.warn("Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage());
+    }
     _taskMap.clear();
-
   }
 
   @Override
@@ -555,26 +582,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
   @Override
   public void shutdown() {
-    LOG.info("shutting down TaskExecutor");
+    LOG.info("Shutting down HelixTaskExecutor");
     _timer.cancel();
 
-    synchronized (_lock) {
-      for (String msgType : _executorMap.keySet()) {
-        List<Runnable> tasksLeft = _executorMap.get(msgType).shutdownNow();
-        LOG.info(tasksLeft.size() + " tasks are still in the threadpool for msgType " + msgType);
-      }
-      for (String msgType : _executorMap.keySet()) {
-        try {
-          if (!_executorMap.get(msgType).awaitTermination(200, TimeUnit.MILLISECONDS)) {
-            LOG.warn(msgType + " is not fully termimated in 200 MS");
-            System.out.println(msgType + " is not fully termimated in 200 MS");
-          }
-        } catch (InterruptedException e) {
-          LOG.error("Interrupted", e);
-        }
-      }
-    }
+    reset();
+
     _monitor.shutDown();
-    LOG.info("shutdown finished");
+    LOG.info("Shutdown HelixTaskExecutor finished");
   }
 }