You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2022/07/20 22:34:41 UTC

[helix] branch master updated: Add TASK_THREADPOOL_RESET_TIMEOUT as system property (#2177)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 49aef7c54 Add TASK_THREADPOOL_RESET_TIMEOUT as system property (#2177)
49aef7c54 is described below

commit 49aef7c54a7f93f119f0b921068235d3dd52cd25
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Wed Jul 20 15:34:35 2022 -0700

    Add TASK_THREADPOOL_RESET_TIMEOUT as system property (#2177)
    
    Add TASK_THREADPOOL_RESET_TIMEOUT as system property
    
    Allow users to specify reset timeout with system property.
---
 .../main/java/org/apache/helix/SystemPropertyKeys.java |  3 +++
 .../helix/messaging/DefaultMessagingService.java       | 18 ++++++++++++++++--
 .../helix/messaging/handling/HelixTaskExecutor.java    |  2 --
 .../apache/helix/messaging/handling/TaskExecutor.java  |  1 +
 .../helix/messaging/TestDefaultMessagingService.java   | 13 +++++++++++++
 5 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index 4fe651c5f..75e8c0316 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -34,6 +34,9 @@ public class SystemPropertyKeys {
   // Task Driver
   public static final String TASK_CONFIG_LIMITATION = "helixTask.configsLimitation";
 
+  // Task executor threadpool reset timeout in ms
+  public static final String TASK_THREADPOOL_RESET_TIMEOUT = "helixTask.threadpool.resetTimeout";
+
   // ZKHelixManager
   public static final String CLUSTER_MANAGER_VERSION = "cluster-manager-version.properties";
 
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index c5b093894..321ff78ee 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -19,6 +19,7 @@ package org.apache.helix.messaging;
  * under the License.
  */
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,10 +35,12 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.messaging.handling.AsyncCallbackService;
 import org.apache.helix.messaging.handling.HelixTaskExecutor;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.TaskExecutor;
 import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
@@ -45,6 +48,7 @@ import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
 import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
+import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +58,7 @@ public class DefaultMessagingService implements ClusterMessagingService {
   private final HelixTaskExecutor _taskExecutor;
   // TODO:rename to factory, this is not a service
   private final AsyncCallbackService _asyncCallbackService;
+  private final int _taskThreadpoolResetTimeout;
 
   private static Logger _logger = LoggerFactory.getLogger(DefaultMessagingService.class);
   ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded =
@@ -72,8 +77,12 @@ public class DefaultMessagingService implements ClusterMessagingService {
         new ParticipantStatusMonitor(isParticipant, manager.getInstanceName()),
         new MessageQueueMonitor(manager.getClusterName(), manager.getInstanceName()));
     _asyncCallbackService = new AsyncCallbackService();
-    _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(),
-        _asyncCallbackService);
+
+    _taskThreadpoolResetTimeout = HelixUtil
+        .getSystemPropertyAsInt(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT,
+            TaskExecutor.DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS);
+    _taskExecutor.registerMessageHandlerFactory(_asyncCallbackService, TaskExecutor.DEFAULT_PARALLEL_TASKS,
+        _taskThreadpoolResetTimeout);
   }
 
   @Override
@@ -335,6 +344,11 @@ public class DefaultMessagingService implements ClusterMessagingService {
     return _taskExecutor;
   }
 
+  @VisibleForTesting
+  int getTaskThreadpoolResetTimeout() {
+    return _taskThreadpoolResetTimeout;
+  }
+
   @Override
   // TODO if the manager is not Participant or Controller, no reply, so should fail immediately
   public int sendAndWait(Criteria recipientCriteria, Message message, AsyncCallback asyncCallback,
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 52467d8b4..80b70bbc3 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -144,8 +144,6 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds
   private static final String SESSION_SYNC = "SESSION-SYNC";
 
-  private static final int DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS = 200; // 200 ms
-
   /**
    * Map of MsgType->MsgHandlerFactoryRegistryItem
    */
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
index e9fa424df..56dc004bf 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 public interface TaskExecutor {
   int DEFAULT_PARALLEL_TASKS = 40;
+  int DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS = 200;
 
   /**
    * Register MultiType message handler factory that the executor can handle.
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
index 221152d23..bf98d5353 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
@@ -34,10 +34,12 @@ import org.apache.helix.MockAccessor;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
+import org.apache.helix.messaging.handling.TaskExecutor;
 import org.apache.helix.mock.MockManager;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
@@ -291,4 +293,15 @@ public class TestDefaultMessagingService {
     Assert.assertTrue(
         svc.getMessageHandlerFactoryMap().containsKey(Message.MessageType.CONTROLLER_MSG.name()));
   }
+
+  @Test
+  public void testTaskThreadpoolResetTimeoutProperty() {
+    HelixManager manager = new MockManager();
+    System.setProperty(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT, "300");
+    MockDefaultMessagingService svc = new MockDefaultMessagingService(manager);
+    Assert.assertEquals(svc.getTaskThreadpoolResetTimeout(), 300);
+    System.clearProperty(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT);
+    svc = new MockDefaultMessagingService(new MockManager());
+    Assert.assertEquals(svc.getTaskThreadpoolResetTimeout(), TaskExecutor.DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS);
+  }
 }