You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/03/06 18:05:37 UTC
[helix] branch master updated: Add API for users to provide customized threadpool for different categories of state transition messages(#2390)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 1c2ca7144 Add API for users to provide customized threadpool for different categories of state transition messages(#2390)
1c2ca7144 is described below
commit 1c2ca71441383482d757a05b9d546de29231cab2
Author: Molly Gao <31...@users.noreply.github.com>
AuthorDate: Mon Mar 6 10:05:30 2023 -0800
Add API for users to provide customized threadpool for different categories of state transition messages(#2390)
Add API for users to provide customized threadpool for different categories of state transition messages
---------
Co-authored-by: Molly Gao <mg...@mgao-mn2.linkedin.biz>
---
.../stages/IntermediateStateCalcStage.java | 2 ++
.../messaging/handling/HelixTaskExecutor.java | 33 ++++++++++++------
.../main/java/org/apache/helix/model/Message.java | 37 ++++++++++++++++++--
.../statemachine/StateModelFactory.java | 39 ++++++++++++++++++++++
.../messaging/handling/TestHelixTaskExecutor.java | 34 ++++++++++++++++---
5 files changed, 127 insertions(+), 18 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 835b19be5..d16d6df4c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -379,10 +379,12 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
// Number of states required by StateModelDefinition are not satisfied, need recovery
if (rebalanceType.equals(RebalanceType.RECOVERY_BALANCE)) {
+ message.setSTRebalanceType(Message.STRebalanceType.RECOVERY_REBALANCE);
messagesForRecovery.add(message.getId());
recoveryRebalance(resource, partition, throttleController, message, cache,
messagesThrottledForRecovery, resourceMessageMap);
} else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) {
+ message.setSTRebalanceType(Message.STRebalanceType.LOAD_REBALANCE);
messagesForLoad.add(message.getId());
loadRebalance(resource, partition, throttleController, message, cache,
onlyDownwardLoadBalance, stateModelDef, messagesThrottledForLoad, resourceMessageMap);
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 4a96f111a..3e715e368 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -163,6 +163,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
/* Resources whose configuration for dedicate thread pool has been checked.*/
final Set<String> _resourcesThreadpoolChecked;
final Set<String> _transitionTypeThreadpoolChecked;
+ final Set<String> _msgInfoBasedThreadpoolChecked;
// timer for schedule timeout tasks
final Timer _timer;
@@ -193,6 +194,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
_resourcesThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap<>());
_transitionTypeThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ _msgInfoBasedThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap<>());
_lock = new Object();
_statusUpdateUtil = new StatusUpdateUtil();
@@ -290,6 +292,19 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
manager.getStateMachineEngine().getStateModelFactory(stateModelName, factoryName);
Message.MessageInfo msgInfo = new Message.MessageInfo(message);
+ if (stateModelFactory != null) {
+ StateModelFactory.CustomizedExecutorService customizedExecutorService =
+ stateModelFactory.getExecutorService(msgInfo);
+ if (customizedExecutorService != null) {
+ String msgInfoBasedKey = msgInfo.getMessageIdentifier(customizedExecutorService.getBase());
+ if (msgInfoBasedKey != null) {
+ _msgInfoBasedThreadpoolChecked.add(msgInfoBasedKey);
+ _executorMap.put(msgInfoBasedKey, customizedExecutorService.getExecutorService());
+ return;
+ }
+ }
+ }
+
String perStateTransitionTypeKey =
msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE);
if (perStateTransitionTypeKey != null && stateModelFactory != null && !_transitionTypeThreadpoolChecked.contains(
@@ -361,17 +376,13 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
executorService = _batchMessageExecutorService;
} else {
Message.MessageInfo msgInfo = new Message.MessageInfo(message);
- String perResourceTypeKey =
- msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE);
- String perStateTransitionTypeKey =
- msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE);
- if (perStateTransitionTypeKey != null && _executorMap.containsKey(perStateTransitionTypeKey)) {
- LOG.info(String.format("Find per state transition type thread pool for resource %s from %s to %s",
- message.getResourceName(), message.getFromState(), message.getToState()));
- executorService = _executorMap.get(perStateTransitionTypeKey);
- } else if (_executorMap.containsKey(perResourceTypeKey)) {
- LOG.info("Find per-resource thread pool with key: " + perResourceTypeKey);
- executorService = _executorMap.get(perResourceTypeKey);
+ for (int i = Message.MessageInfo.MessageIdentifierBase.values().length - 1; i >= 0; i--) {
+ String msgIdentifer = msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.values()[i]);
+ if (msgIdentifer != null && _executorMap.containsKey(msgIdentifer)) {
+ LOG.info(String.format("Find customized threadpool for %s", msgIdentifer));
+ executorService = _executorMap.get(msgIdentifer);
+ break;
+ }
}
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index b3a7c8884..acf075821 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -103,7 +103,8 @@ public class Message extends HelixProperty {
RELAY_TIME,
RELAY_FROM,
EXPIRY_PERIOD,
- SRC_CLUSTER
+ SRC_CLUSTER,
+ ST_REBALANCE_TYPE
}
/**
@@ -115,6 +116,11 @@ public class Message extends HelixProperty {
UNPROCESSABLE // get exception when create handler
}
+ public enum STRebalanceType {
+ LOAD_REBALANCE,
+ RECOVERY_REBALANCE
+ }
+
// default expiry time period for a relay message.
public static final long RELAY_MESSAGE_DEFAULT_EXPIRY = 5 * 1000; // 5 second
@@ -660,6 +666,15 @@ public class Message extends HelixProperty {
}
}
+ public void setSTRebalanceType(STRebalanceType stRebalanceType) {
+ setAttribute(Attributes.ST_REBALANCE_TYPE, stRebalanceType.name());
+ }
+
+ public STRebalanceType getSTRebalanceType() {
+ String rebalanceTypeStr = getAttribute(Attributes.ST_REBALANCE_TYPE);
+ return rebalanceTypeStr == null ? null : STRebalanceType.valueOf(rebalanceTypeStr);
+ }
+
/**
* Add or change a message attribute
* @param attr {@link Attributes} attribute name
@@ -969,16 +984,24 @@ public class Message extends HelixProperty {
/**
* This class is for categorizing state transition messages based on certain properties, and generating
+ * an identifier string for each category
*/
public static class MessageInfo {
public String _msgType;
public String _resourceName;
public String _fromState;
public String _toState;
+ public STRebalanceType _sTRebalanceType;
+ /**
+ * This class is used for defining what properties are used to categorize messages.
+ * Note that the bases should be arranged in a lower level -> higher level order,
+ * because we do backwards searching when looking for customized threadpool.
+ */
public enum MessageIdentifierBase {
- PER_RESOURCE,
- PER_STATE_TRANSITION_TYPE
+ PER_RESOURCE, // L0: Most basic level, just include message type and resource name
+ PER_REBALANCE_TYPE, // L1: One level above PER_RESOURCE, concatenate rebalance type (load / recovery) after PER_RESOURCE identifier string
+ PER_STATE_TRANSITION_TYPE // L1: One level above PER_RESOURCE, concatenate state transition type (from state, to state) after PER_RESOURCE identifier string
}
public MessageInfo(Message message) {
@@ -986,6 +1009,7 @@ public class Message extends HelixProperty {
_resourceName = message.getResourceName();
_fromState = message.getFromState();
_toState = message.getToState();
+ _sTRebalanceType = message.getSTRebalanceType();
}
public String getMessageIdentifier(MessageIdentifierBase basis) {
@@ -995,6 +1019,13 @@ public class Message extends HelixProperty {
}
String identifier = String.join(delimiter, _msgType, _resourceName);
switch (basis) {
+ case PER_REBALANCE_TYPE:
+ if (_sTRebalanceType == null) {
+ return null;
+ }
+ identifier =
+ String.join(delimiter, identifier, _sTRebalanceType.name());
+ break;
case PER_STATE_TRANSITION_TYPE:
if (_fromState == null || _toState == null) {
return null;
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
index eb4c372a8..69326906b 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import org.apache.helix.messaging.handling.BatchMessageWrapper;
+import org.apache.helix.model.Message;
public abstract class StateModelFactory<T extends StateModel> {
/**
@@ -260,4 +261,42 @@ public abstract class StateModelFactory<T extends StateModel> {
public ExecutorService getExecutorService(String resourceName, String fromState, String toState) {
return null;
}
+
+ public static class CustomizedExecutorService {
+ private final Message.MessageInfo.MessageIdentifierBase _base;
+ private final ExecutorService _executorService;
+
+ public CustomizedExecutorService(Message.MessageInfo.MessageIdentifierBase base, ExecutorService executorService) {
+ if (base == null || executorService == null) {
+ throw new IllegalArgumentException(
+ "MessageIdentifierBase or ExecutorService cannot be null to instantiate a CustomizedExecutorService object");
+ }
+ _base = base;
+ _executorService = executorService;
+ }
+
+ public Message.MessageInfo.MessageIdentifierBase getBase() {
+ return _base;
+ }
+
+ public ExecutorService getExecutorService() {
+ return _executorService;
+ }
+ }
+
+ /**
+ *Get thread pool to handle the given state transition message.
+ * If this method returns null, the threadpool returned from
+ * {@link StateModelFactory#getExecutorService(String resourceName, String fromState, String toState)} will be used;
+ * it this method returns null the threadpool returned from
+ * {@link StateModelFactory#getExecutorService(String resourceName)} will be used.
+ * If that method return null too, then the default shared threadpool will be used.
+ * This method may be called only once for each category of messages,
+ * it will NOT be called during each state transition.
+ * @param msgInfo contains information used to categorize messages to use different threadpools
+ * @return An object contains the MessageIdentifierBase and the assigned threadpool for the input message
+ */
+ public CustomizedExecutorService getExecutorService(Message.MessageInfo msgInfo) {
+ return null;
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index d4b554978..5ce7c55ae 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -1127,6 +1127,8 @@ public class TestHelixTaskExecutor {
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
ThreadPoolExecutor executor1 =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+ ThreadPoolExecutor executor2 =
+ new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
class MockStateModelFactory_ResourceName
extends StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel> {
@@ -1144,6 +1146,14 @@ public class TestHelixTaskExecutor {
}
}
+ class MockStateModelFactory_MsgInfo
+ extends StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel> {
+ @Override
+ public CustomizedExecutorService getExecutorService(Message.MessageInfo msgInfo) {
+ return new CustomizedExecutorService(Message.MessageInfo.MessageIdentifierBase.PER_REBALANCE_TYPE, executor2);
+ }
+ }
+
System.out.println("START " + TestHelper.getTestMethodName());
String sessionId = UUID.randomUUID().toString();
String resourceName = "testDB";
@@ -1162,11 +1172,11 @@ public class TestHelixTaskExecutor {
when(manager.getClusterName()).thenReturn(TestHelper.getTestMethodName());
StateModel stateModel = new MockMasterSlaveStateModel();
NotificationContext context = new NotificationContext(manager);
- HelixTaskExecutor executor = new HelixTaskExecutor();
Message message = new Message(Message.MessageType.STATE_TRANSITION, msgId);
message.setFromState(fromState);
message.setToState(toState);
message.setResourceName(resourceName);
+ message.setSTRebalanceType(Message.STRebalanceType.LOAD_REBALANCE);
message.setStateModelDef(stateModelDef);
message.setPartitionName("TestPartition");
message.setTgtName("TgtInstance");
@@ -1174,8 +1184,8 @@ public class TestHelixTaskExecutor {
message.setTgtSessionId(sessionId);
// State transition type based
- executor =
- new HelixTaskExecutor(); // Re-initialize it because if the message exists in _taskMap, it won't be assigned again
+ HelixTaskExecutor executor =
+ new HelixTaskExecutor();
StateModelFactory<? extends StateModel> factory = new MockStateModelFactory_STType();
Mockito.doReturn(factory)
.when(engine)
@@ -1189,7 +1199,7 @@ public class TestHelixTaskExecutor {
System.out.println(TestHelper.getTestMethodName() + ": State transition based test passed.");
// Resource name based
- executor = new HelixTaskExecutor();
+ executor = new HelixTaskExecutor(); // Re-initialize it because if the message exists in _taskMap, it won't be assigned again
factory = new MockStateModelFactory_ResourceName();
Mockito.doReturn(factory)
.when(engine)
@@ -1202,6 +1212,22 @@ public class TestHelixTaskExecutor {
return executor0.getTaskCount() == 1;
}, TestHelper.WAIT_DURATION));
System.out.println(TestHelper.getTestMethodName() + ": Resource name based test passed.");
+
+ // Message Info based
+ executor = new HelixTaskExecutor();
+ factory = new MockStateModelFactory_MsgInfo();
+ handler =
+ new HelixStateTransitionHandler(factory, stateModel, message, context, new CurrentState(resourceName));
+ Mockito.doReturn(factory)
+ .when(engine)
+ .getStateModelFactory(stateModelDef, HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+ task = new HelixTask(message, context, handler, executor);
+ executor.scheduleTask(task);
+ Assert.assertTrue(TestHelper.verify(() -> {
+ return executor2.getTaskCount() == 1;
+ }, TestHelper.WAIT_DURATION));
+ System.out.println(TestHelper.getTestMethodName() + ": Message Info based test passed.");
+
System.out.println("END " + TestHelper.getTestMethodName());
}
}