You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2022/07/20 22:34:41 UTC
[helix] branch master updated: Add TASK_THREADPOOL_RESET_TIMEOUT as system property (#2177)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 49aef7c54 Add TASK_THREADPOOL_RESET_TIMEOUT as system property (#2177)
49aef7c54 is described below
commit 49aef7c54a7f93f119f0b921068235d3dd52cd25
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Wed Jul 20 15:34:35 2022 -0700
Add TASK_THREADPOOL_RESET_TIMEOUT as system property (#2177)
Add TASK_THREADPOOL_RESET_TIMEOUT as system property
Allow users to specify reset timeout with system property.
---
.../main/java/org/apache/helix/SystemPropertyKeys.java | 3 +++
.../helix/messaging/DefaultMessagingService.java | 18 ++++++++++++++++--
.../helix/messaging/handling/HelixTaskExecutor.java | 2 --
.../apache/helix/messaging/handling/TaskExecutor.java | 1 +
.../helix/messaging/TestDefaultMessagingService.java | 13 +++++++++++++
5 files changed, 33 insertions(+), 4 deletions(-)
diff --git a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index 4fe651c5f..75e8c0316 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -34,6 +34,9 @@ public class SystemPropertyKeys {
// Task Driver
public static final String TASK_CONFIG_LIMITATION = "helixTask.configsLimitation";
+ // Task executor threadpool reset timeout in ms
+ public static final String TASK_THREADPOOL_RESET_TIMEOUT = "helixTask.threadpool.resetTimeout";
+
// ZKHelixManager
public static final String CLUSTER_MANAGER_VERSION = "cluster-manager-version.properties";
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index c5b093894..321ff78ee 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -19,6 +19,7 @@ package org.apache.helix.messaging;
* under the License.
*/
+import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -34,10 +35,12 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.messaging.handling.AsyncCallbackService;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.messaging.handling.TaskExecutor;
import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
@@ -45,6 +48,7 @@ import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.builder.ConfigScopeBuilder;
import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
+import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +58,7 @@ public class DefaultMessagingService implements ClusterMessagingService {
private final HelixTaskExecutor _taskExecutor;
// TODO:rename to factory, this is not a service
private final AsyncCallbackService _asyncCallbackService;
+ private final int _taskThreadpoolResetTimeout;
private static Logger _logger = LoggerFactory.getLogger(DefaultMessagingService.class);
ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded =
@@ -72,8 +77,12 @@ public class DefaultMessagingService implements ClusterMessagingService {
new ParticipantStatusMonitor(isParticipant, manager.getInstanceName()),
new MessageQueueMonitor(manager.getClusterName(), manager.getInstanceName()));
_asyncCallbackService = new AsyncCallbackService();
- _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(),
- _asyncCallbackService);
+
+ _taskThreadpoolResetTimeout = HelixUtil
+ .getSystemPropertyAsInt(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT,
+ TaskExecutor.DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS);
+ _taskExecutor.registerMessageHandlerFactory(_asyncCallbackService, TaskExecutor.DEFAULT_PARALLEL_TASKS,
+ _taskThreadpoolResetTimeout);
}
@Override
@@ -335,6 +344,11 @@ public class DefaultMessagingService implements ClusterMessagingService {
return _taskExecutor;
}
+ @VisibleForTesting
+ int getTaskThreadpoolResetTimeout() {
+ return _taskThreadpoolResetTimeout;
+ }
+
@Override
// TODO if the manager is not Participant or Controller, no reply, so should fail immediately
public int sendAndWait(Criteria recipientCriteria, Message message, AsyncCallback asyncCallback,
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 52467d8b4..80b70bbc3 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -144,8 +144,6 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds
private static final String SESSION_SYNC = "SESSION-SYNC";
- private static final int DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS = 200; // 200 ms
-
/**
* Map of MsgType->MsgHandlerFactoryRegistryItem
*/
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
index e9fa424df..56dc004bf 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/TaskExecutor.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
public interface TaskExecutor {
int DEFAULT_PARALLEL_TASKS = 40;
+ int DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS = 200;
/**
* Register MultiType message handler factory that the executor can handle.
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
index 221152d23..bf98d5353 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java
@@ -34,10 +34,12 @@ import org.apache.helix.MockAccessor;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
+import org.apache.helix.messaging.handling.TaskExecutor;
import org.apache.helix.mock.MockManager;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
@@ -291,4 +293,15 @@ public class TestDefaultMessagingService {
Assert.assertTrue(
svc.getMessageHandlerFactoryMap().containsKey(Message.MessageType.CONTROLLER_MSG.name()));
}
+
+ @Test
+ public void testTaskThreadpoolResetTimeoutProperty() {
+ HelixManager manager = new MockManager();
+ System.setProperty(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT, "300");
+ MockDefaultMessagingService svc = new MockDefaultMessagingService(manager);
+ Assert.assertEquals(svc.getTaskThreadpoolResetTimeout(), 300);
+ System.clearProperty(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT);
+ svc = new MockDefaultMessagingService(new MockManager());
+ Assert.assertEquals(svc.getTaskThreadpoolResetTimeout(), TaskExecutor.DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS);
+ }
}