You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/12/14 00:51:47 UTC

[helix] branch master updated: Make theadpool shutdown timeout configurable for the HelixTaskExecutor. (#1920)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ed4163  Make theadpool shutdown timeout configurable for the HelixTaskExecutor. (#1920)
5ed4163 is described below

commit 5ed41632526a6fef26bde3eacd70b0e30b7c618a
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Mon Dec 13 16:51:39 2021 -0800

    Make theadpool shutdown timeout configurable for the HelixTaskExecutor. (#1920)
    
    Add TestHelixTaskExecutor.testHandlerResetTimeout() to cover the new changes.
    Also refactoring the related code to reduce duplicate and confusing code.
---
 .../apache/helix/examples/BootstrapProcess.java    |  6 --
 .../zk/DefaultControllerMessageHandlerFactory.java |  7 +-
 ...faultParticipantErrorMessageHandlerFactory.java |  7 +-
 .../zk/DefaultSchedulerMessageHandlerFactory.java  |  7 +-
 .../messaging/handling/AsyncCallbackService.java   |  5 --
 .../messaging/handling/HelixTaskExecutor.java      | 72 ++++++++++++------
 .../messaging/handling/MessageHandlerFactory.java  |  2 +
 .../handling/MultiTypeMessageHandlerFactory.java   | 11 +++
 .../helix/messaging/handling/TaskExecutor.java     | 46 +++++++-----
 .../helix/participant/HelixStateMachineEngine.java |  5 --
 .../helix/integration/TestZkSessionExpiry.java     |  7 +-
 .../messaging/TestMessagingService.java            |  5 --
 .../messaging/TestSchedulerMessage.java            | 10 ---
 .../messaging/TestDefaultMessagingService.java     | 11 ---
 .../handling/TestConfigThreadpoolSize.java         | 10 ---
 .../messaging/handling/TestHelixTaskExecutor.java  | 87 ++++++++++++++++------
 16 files changed, 159 insertions(+), 139 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
index e0809e7..665b1fe 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/BootstrapProcess.java
@@ -124,12 +124,6 @@ public class BootstrapProcess {
       return new CustomMessageHandler(message, context);
     }
 
-    @Deprecated
-    @Override
-    public String getMessageType() {
-      return MessageType.USER_DEFINE_MSG.name();
-    }
-
     @Override
     public List<String> getMessageTypes() {
       return ImmutableList.of(MessageType.USER_DEFINE_MSG.name());
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
index 4223d2d..0d534b6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultControllerMessageHandlerFactory.java
@@ -39,7 +39,7 @@ public class DefaultControllerMessageHandlerFactory implements MultiTypeMessageH
   public MessageHandler createHandler(Message message, NotificationContext context) {
     String type = message.getMsgType();
 
-    if (!type.equals(getMessageType())) {
+    if (!getMessageTypes().contains(type)) {
       throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
           + message.getMsgType());
     }
@@ -48,11 +48,6 @@ public class DefaultControllerMessageHandlerFactory implements MultiTypeMessageH
   }
 
   @Override
-  public String getMessageType() {
-    return MessageType.CONTROLLER_MSG.name();
-  }
-
-  @Override
   public List<String> getMessageTypes() {
     return ImmutableList.of(MessageType.CONTROLLER_MSG.name());
   }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
index 67c6f64..eface61 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultParticipantErrorMessageHandlerFactory.java
@@ -111,7 +111,7 @@ public class DefaultParticipantErrorMessageHandlerFactory implements MultiTypeMe
   public MessageHandler createHandler(Message message, NotificationContext context) {
     String type = message.getMsgType();
 
-    if (!type.equals(getMessageType())) {
+    if (!getMessageTypes().contains(type)) {
       throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
           + message.getMsgType());
     }
@@ -120,11 +120,6 @@ public class DefaultParticipantErrorMessageHandlerFactory implements MultiTypeMe
   }
 
   @Override
-  public String getMessageType() {
-    return Message.MessageType.PARTICIPANT_ERROR_REPORT.name();
-  }
-
-  @Override
   public List<String> getMessageTypes() {
     return ImmutableList.of(Message.MessageType.PARTICIPANT_ERROR_REPORT.name());
   }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index fb539f9..7df5615 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -131,7 +131,7 @@ public class DefaultSchedulerMessageHandlerFactory implements MultiTypeMessageHa
   public MessageHandler createHandler(Message message, NotificationContext context) {
     String type = message.getMsgType();
 
-    if (!type.equals(getMessageType())) {
+    if (!getMessageTypes().contains(type)) {
       throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
           + message.getMsgType());
     }
@@ -140,11 +140,6 @@ public class DefaultSchedulerMessageHandlerFactory implements MultiTypeMessageHa
   }
 
   @Override
-  public String getMessageType() {
-    return MessageType.SCHEDULER_MSG.name();
-  }
-
-  @Override
   public List<String> getMessageTypes() {
     return ImmutableList.of(MessageType.SCHEDULER_MSG.name());
   }
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
index 6b73b1a..fa15552 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/AsyncCallbackService.java
@@ -81,11 +81,6 @@ public class AsyncCallbackService implements MultiTypeMessageHandlerFactory {
   }
 
   @Override
-  public String getMessageType() {
-    return MessageType.TASK_REPLY.name();
-  }
-
-  @Override
   public List<String> getMessageTypes() {
     return ImmutableList.of(MessageType.TASK_REPLY.name());
   }
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index b981a98..52467d8 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -89,8 +89,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   class MsgHandlerFactoryRegistryItem {
     private final MessageHandlerFactory _factory;
     private final int _threadPoolSize;
+    private final int _resetTimeout;
 
-    public MsgHandlerFactoryRegistryItem(MessageHandlerFactory factory, int threadPoolSize) {
+    public MsgHandlerFactoryRegistryItem(MessageHandlerFactory factory, int threadPoolSize, int resetTimeout) {
       if (factory == null) {
         throw new NullPointerException("Message handler factory is null");
       }
@@ -99,14 +100,23 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         throw new IllegalArgumentException("Illegal thread pool size: " + threadPoolSize);
       }
 
+      if (resetTimeout < 0) {
+        throw new IllegalArgumentException("Illegal reset timeout: " + resetTimeout);
+      }
+
       _factory = factory;
       _threadPoolSize = threadPoolSize;
+      _resetTimeout = resetTimeout;
     }
 
     int threadPoolSize() {
       return _threadPoolSize;
     }
 
+    int getResetTimeout() {
+      return _resetTimeout;
+    }
+
     MessageHandlerFactory factory() {
       return _factory;
     }
@@ -118,7 +128,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   // TODO: we need to further design how to throttle this.
   // From storage point of view, only bootstrap case is expensive
   // and we need to throttle, which is mostly IO / network bounded.
-  public static final int DEFAULT_PARALLEL_TASKS = 40;
+  public static final int DEFAULT_PARALLEL_TASKS = TaskExecutor.DEFAULT_PARALLEL_TASKS;
   // TODO: create per-task type threadpool with customizable pool size
   protected final Map<String, MessageTaskInfo> _taskMap;
   private final Object _lock;
@@ -133,6 +143,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   private LiveInstanceStatus _liveInstanceStatus;
   private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds
   private static final String SESSION_SYNC = "SESSION-SYNC";
+
+  private static final int DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS = 200; // 200 ms
+
   /**
    * Map of MsgType->MsgHandlerFactoryRegistryItem
    */
@@ -193,13 +206,25 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   }
 
   @Override
+  public void registerMessageHandlerFactory(MultiTypeMessageHandlerFactory factory,
+      int threadPoolSize, int resetTimeoutMs) {
+    for (String type : factory.getMessageTypes()) {
+      registerMessageHandlerFactory(type, factory, threadPoolSize, resetTimeoutMs);
+    }
+  }
+
+  @Override
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory) {
     registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS);
   }
 
   @Override
-  public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
-      int threadpoolSize) {
+  public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, int threadpoolSize) {
+    registerMessageHandlerFactory(type, factory, threadpoolSize, DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS);
+  }
+
+  private void registerMessageHandlerFactory(String type, MessageHandlerFactory factory, int threadpoolSize,
+      int resetTimeoutMs) {
     if (factory instanceof MultiTypeMessageHandlerFactory) {
       if (!((MultiTypeMessageHandlerFactory) factory).getMessageTypes().contains(type)) {
         throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: "
@@ -215,8 +240,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
     _isShuttingDown = false;
 
-    MsgHandlerFactoryRegistryItem newItem =
-        new MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
+    MsgHandlerFactoryRegistryItem newItem = new MsgHandlerFactoryRegistryItem(factory, threadpoolSize, resetTimeoutMs);
     MsgHandlerFactoryRegistryItem prevItem = _hdlrFtyRegistry.putIfAbsent(type, newItem);
     if (prevItem == null) {
       _executorMap.computeIfAbsent(type, msgType -> {
@@ -225,14 +249,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         _monitor.createExecutorMonitor(type, newPool);
         return newPool;
       });
-      LOG.info(
-          "Registered message handler factory for type: " + type + ", poolSize: " + threadpoolSize
-              + ", factory: " + factory + ", pool: " + _executorMap.get(type));
+      LOG.info("Registered message handler factory for type: {}, poolSize: {}, factory: {}, pool: {}", type,
+          threadpoolSize, factory, _executorMap.get(type));
     } else {
-      LOG.info("Skip register message handler factory for type: " + type + ", poolSize: "
-          + threadpoolSize + ", factory: " + factory + ", already existing factory: " + prevItem
-          .factory());
-      newItem = null;
+      LOG.info(
+          "Skip register message handler factory for type: {}, poolSize: {}, factory: {}, already existing factory: {}",
+          type, threadpoolSize, factory, prevItem.factory());
     }
   }
 
@@ -586,22 +608,26 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     }
   }
 
-  private void shutdownAndAwaitTermination(ExecutorService pool) {
+  private void shutdownAndAwaitTermination(ExecutorService pool, MsgHandlerFactoryRegistryItem handlerItem) {
     LOG.info("Shutting down pool: " + pool);
+
+    int timeout = handlerItem == null? DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS : handlerItem.getResetTimeout();
+
     pool.shutdown(); // Disable new tasks from being submitted
     try {
       // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(200, TimeUnit.MILLISECONDS)) {
+      if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
         List<Runnable> waitingTasks = pool.shutdownNow(); // Cancel currently executing tasks
-        LOG.info("Tasks that never commenced execution: " + waitingTasks);
+        LOG.info("Tasks that never commenced execution after {}: {}", timeout,
+            waitingTasks);
         // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(200, TimeUnit.MILLISECONDS)) {
-          LOG.error("Pool did not fully terminate in 200ms. pool: " + pool);
+        if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
+          LOG.error("Pool did not fully terminate in {} ms. pool: {}", timeout, pool);
         }
       }
     } catch (InterruptedException ie) {
       // (Re-)Cancel if current thread also interrupted
-      LOG.error("Interruped when waiting for shutdown pool: " + pool, ie);
+      LOG.error("Interrupted when waiting for shutdown pool: " + pool, ie);
       pool.shutdownNow();
       // Preserve interrupt status
       Thread.currentThread().interrupt();
@@ -622,7 +648,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
             + ", pool: " + pool);
 
     if (pool != null) {
-      shutdownAndAwaitTermination(pool);
+      shutdownAndAwaitTermination(pool, item);
     }
 
     // reset state-model
@@ -663,14 +689,14 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     synchronized (_hdlrFtyRegistry) {
       for (String msgType : _hdlrFtyRegistry.keySet()) {
         // don't un-register factories, just shutdown all executors
+        MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
         ExecutorService pool = _executorMap.remove(msgType);
         _monitor.removeExecutorMonitor(msgType);
         if (pool != null) {
-          LOG.info("Reset exectuor for msgType: " + msgType + ", pool: " + pool);
-          shutdownAndAwaitTermination(pool);
+          LOG.info("Reset executor for msgType: " + msgType + ", pool: " + pool);
+          shutdownAndAwaitTermination(pool, item);
         }
 
-        MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
         if (item.factory() != null) {
           try {
             item.factory().reset();
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
index b402c29..1b13909 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
@@ -36,6 +36,8 @@ public interface MessageHandlerFactory {
    */
   MessageHandler createHandler(Message message, NotificationContext context);
 
+  @Deprecated
+  // Please update the logic to implement MultiTypeMessageHandlerFactory.getMessageTypes instead.
   String getMessageType();
 
   void reset();
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/MultiTypeMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/MultiTypeMessageHandlerFactory.java
index b14e10e..a4d34fc 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/MultiTypeMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/MultiTypeMessageHandlerFactory.java
@@ -26,4 +26,15 @@ public interface MultiTypeMessageHandlerFactory extends MessageHandlerFactory {
 
   List<String> getMessageTypes();
 
+  @Override
+  @Deprecated
+  // TODO remove this default implementation for backward compatibility when clean up the legacy code.
+  default String getMessageType() {
+    List<String> types = getMessageTypes();
+    if (types == null || types.isEmpty()) {
+      return null;
+    } else {
+      return types.get(0);
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
index 94aad69..e9fa424 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
@@ -24,62 +24,72 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 public interface TaskExecutor {
-  public static final int DEFAULT_PARALLEL_TASKS = 40;
+  int DEFAULT_PARALLEL_TASKS = 40;
 
   /**
-   * register message handler factory this executor can handle
+   * Register MultiType message handler factory that the executor can handle.
+   * @param factory MultiType message handler factory
+   * @param threadPoolSize Threadpool size of the corresponding execute service.
+   * @param resetTimeoutMs Timeout when wait for the execute service to be shutdown.
+   */
+  default void registerMessageHandlerFactory(MultiTypeMessageHandlerFactory factory,
+      int threadPoolSize, int resetTimeoutMs) {
+    throw new UnsupportedOperationException("This method has not been implemented.");
+  }
+
+  /**
+   * Register message handler factory this executor can handle
    * @param type
    * @param factory
    */
-  public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory);
+  @Deprecated
+  void registerMessageHandlerFactory(String type, MessageHandlerFactory factory);
 
   /**
-   * register message handler factory this executor can handle with specified
+   * Register message handler factory this executor can handle with specified
    * thread-pool size
    * @param type
    * @param factory
-   * @param threadpoolSize
+   * @param threadPoolSize
    */
-  public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
+  @Deprecated
+  void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
       int threadPoolSize);
 
   /**
    * schedule a message execution
-   * @param message
-   * @param handler
-   * @param context
+   * @param task
    */
-  public boolean scheduleTask(MessageTask task);
+  boolean scheduleTask(MessageTask task);
 
   /**
    * blocking on scheduling all tasks
    * @param tasks
    */
-  public List<Future<HelixTaskResult>> invokeAllTasks(List<MessageTask> tasks, long timeout,
+  List<Future<HelixTaskResult>> invokeAllTasks(List<MessageTask> tasks, long timeout,
       TimeUnit unit) throws InterruptedException;
 
   /**
    * cancel a message execution
-   * @param message
-   * @param context
+   * @param task
    */
-  public boolean cancelTask(MessageTask task);
+  boolean cancelTask(MessageTask task);
 
   /**
    * cancel the timeout for the given task
    * @param task
    * @return
    */
-  public boolean cancelTimeoutTask(MessageTask task);
+  boolean cancelTimeoutTask(MessageTask task);
 
   /**
    * finish a message execution
-   * @param message
+   * @param task
    */
-  public void finishTask(MessageTask task);
+  void finishTask(MessageTask task);
 
   /**
    * shutdown executor
    */
-  public void shutdown();
+  void shutdown();
 }
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index e421638..50a0782 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -295,11 +295,6 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     }
   }
 
-  @Override
-  public String getMessageType() {
-    return MessageType.STATE_TRANSITION.name();
-  }
-
   @Override public List<String> getMessageTypes() {
     return ImmutableList
         .of(MessageType.STATE_TRANSITION.name(), MessageType.STATE_TRANSITION_CANCELLATION.name());
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
index 5012de0..e264920 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
@@ -80,12 +80,7 @@ public class TestZkSessionExpiry extends ZkUnitTestBase {
     public MessageHandler createHandler(Message message, NotificationContext context) {
       return new DummyMessageHandler(message, context, _handledMsgSet);
     }
-
-    @Override
-    public String getMessageType() {
-      return DUMMY_MSG_TYPE;
-    }
-
+    
     @Override
     public List<String> getMessageTypes() {
       return ImmutableList.of(DUMMY_MSG_TYPE);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessagingService.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessagingService.java
index 902f595..873bffc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessagingService.java
@@ -49,11 +49,6 @@ public class TestMessagingService extends ZkStandAloneCMTestBase {
     }
 
     @Override
-    public String getMessageType() {
-      return "TestExtensibility";
-    }
-
-    @Override
     public List<String> getMessageTypes() {
       return ImmutableList.of("TestExtensibility");
     }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestSchedulerMessage.java
index a897f1d..a4878e9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestSchedulerMessage.java
@@ -89,11 +89,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
     }
 
     @Override
-    public String getMessageType() {
-      return "TestParticipant";
-    }
-
-    @Override
     public List<String> getMessageTypes() {
       return ImmutableList.of("TestParticipant");
     }
@@ -148,11 +143,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
     }
 
     @Override
-    public String getMessageType() {
-      return "TestMessagingHandlerLatch";
-    }
-
-    @Override
     public List<String> getMessageTypes() {
       return ImmutableList.of("TestMessagingHandlerLatch");
     }
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
index c554742..221152d 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
@@ -154,12 +154,6 @@ public class TestDefaultMessagingService {
       return new TestMessageHandler(message, context);
     }
 
-    @Override
-    public String getMessageType() {
-      // TODO Auto-generated method stub
-      return "TestingMessageHandler";
-    }
-
     @Override public List<String> getMessageTypes() {
       return ImmutableList.of("TestingMessageHandler");
     }
@@ -179,11 +173,6 @@ public class TestDefaultMessagingService {
     }
 
     @Override
-    public String getMessageType() {
-      return null;
-    }
-
-    @Override
     public List<String> getMessageTypes() {
       return ImmutableList.of(Message.MessageType.STATE_TRANSITION.name(),
           Message.MessageType.STATE_TRANSITION_CANCELLATION.name(),
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
index 8586880..06830e9 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
@@ -44,11 +44,6 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
       return null;
     }
 
-    @Override
-    public String getMessageType() {
-      return "TestMsg";
-    }
-
     @Override public List<String> getMessageTypes() {
       return ImmutableList.of("TestMsg");
     }
@@ -68,11 +63,6 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
       return null;
     }
 
-    @Override
-    public String getMessageType() {
-      return "TestMsg2";
-    }
-
     @Override public List<String> getMessageTypes() {
       return ImmutableList.of("TestMsg2");
     }
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 59d6b06..7b23990 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 
 import com.google.common.collect.ImmutableList;
@@ -70,8 +71,18 @@ public class TestHelixTaskExecutor {
   }
 
   class TestMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
+    final int _messageDelay;
     int _handlersCreated = 0;
     ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<>();
+    ConcurrentSkipListSet<String> _completedMsgIds = new ConcurrentSkipListSet<>();
+
+    TestMessageHandlerFactory(int messageDelay) {
+      _messageDelay = messageDelay;
+    }
+
+    TestMessageHandlerFactory() {
+      _messageDelay = 100;
+    }
 
     class TestMessageHandler extends MessageHandler {
       public TestMessageHandler(Message message, NotificationContext context) {
@@ -82,8 +93,9 @@ public class TestHelixTaskExecutor {
       public HelixTaskResult handleMessage() throws InterruptedException {
         HelixTaskResult result = new HelixTaskResult();
         _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
-        Thread.sleep(100);
+        Thread.sleep(_messageDelay);
         result.setSuccess(true);
+        _completedMsgIds.add(_message.getMsgId());
         return result;
       }
 
@@ -103,11 +115,6 @@ public class TestHelixTaskExecutor {
     }
 
     @Override
-    public String getMessageType() {
-      return "TestingMessageHandler";
-    }
-
-    @Override
     public List<String> getMessageTypes() {
       return Collections.singletonList("TestingMessageHandler");
     }
@@ -120,15 +127,9 @@ public class TestHelixTaskExecutor {
 
   class TestMessageHandlerFactory2 extends TestMessageHandlerFactory {
     @Override
-    public String getMessageType() {
-      return "TestingMessageHandler2";
-    }
-
-    @Override
     public List<String> getMessageTypes() {
       return ImmutableList.of("TestingMessageHandler2");
     }
-
   }
 
   class CancellableHandlerFactory implements MultiTypeMessageHandlerFactory {
@@ -186,11 +187,6 @@ public class TestHelixTaskExecutor {
       return new CancellableHandler(message, context);
     }
 
-    @Override
-    public String getMessageType() {
-      return "Cancellable";
-    }
-
     @Override public List<String> getMessageTypes() {
       return ImmutableList.of("Cancellable");
     }
@@ -268,11 +264,6 @@ public class TestHelixTaskExecutor {
     }
 
     @Override
-    public String getMessageType() {
-      return _msgType;
-    }
-
-    @Override
     public List<String> getMessageTypes() {
       return ImmutableList.of(_msgType);
     }
@@ -755,6 +746,58 @@ public class TestHelixTaskExecutor {
     System.out.println("END TestCMTaskExecutor.testShutdown()");
   }
 
+  @Test(dependsOnMethods = "testShutdown")
+  public void testHandlerResetTimeout() throws Exception {
+    System.out.println("START TestCMTaskExecutor.testHandlerResetTimeout()");
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+
+    int messageDelay = 2 * 1000; // 2 seconds
+    TestMessageHandlerFactory factory = new TestMessageHandlerFactory(messageDelay);
+
+    // Execute a message with short reset timeout
+    int shortTimeout = 100; // 100 ms
+    executor.registerMessageHandlerFactory(factory, HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, shortTimeout);
+
+    final Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
+    msg.setTgtSessionId("*");
+    msg.setTgtName("Localhost_1123");
+    msg.setSrcName("127.101.1.23_2234");
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+    executor.onMessage("some", Arrays.asList(msg), changeContext);
+    Assert.assertTrue(
+        TestHelper.verify(() -> factory._processedMsgIds.containsKey(msg.getMsgId()), TestHelper.WAIT_DURATION));
+    executor.shutdown();
+    for (ExecutorService svc : executor._executorMap.values()) {
+      Assert.assertTrue(svc.isShutdown());
+    }
+    Assert.assertEquals(factory._completedMsgIds.size(), 0);
+
+    // Execute a message with proper reset timeout, so it will wait enough time until the message is processed.
+    executor = new HelixTaskExecutor();
+    int longTimeout = messageDelay * 2; // 4 seconds
+    executor.registerMessageHandlerFactory(factory, HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, longTimeout);
+
+    final Message msg2 = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
+    msg2.setTgtSessionId("*");
+    msg2.setTgtName("Localhost_1123");
+    msg2.setSrcName("127.101.1.23_2234");
+    executor.onMessage("some", Arrays.asList(msg2), changeContext);
+
+    Assert.assertTrue(
+        TestHelper.verify(() -> factory._processedMsgIds.containsKey(msg2.getMsgId()), TestHelper.WAIT_DURATION));
+    executor.shutdown();
+    for (ExecutorService svc : executor._executorMap.values()) {
+      Assert.assertTrue(svc.isShutdown());
+    }
+    Assert.assertEquals(factory._completedMsgIds.size(), 1);
+    Assert.assertTrue(factory._completedMsgIds.contains(msg2.getMsgId()));
+
+    System.out.println("END TestCMTaskExecutor.testHandlerResetTimeout()");
+  }
+
   @Test()
   public void testNoRetry() throws InterruptedException {
     System.out.println("START " + TestHelper.getTestMethodName());