You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/03/06 18:05:37 UTC

[helix] branch master updated: Add API for users to provide customized threadpool for different categories of state transition messages(#2390)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c2ca7144  Add API for users to provide customized threadpool for different categories of state transition messages(#2390)
1c2ca7144 is described below

commit 1c2ca71441383482d757a05b9d546de29231cab2
Author: Molly Gao <31...@users.noreply.github.com>
AuthorDate: Mon Mar 6 10:05:30 2023 -0800

     Add API for users to provide customized threadpool for different categories of state transition messages(#2390)
    
     Add API for users to provide customized threadpool for different categories of state transition messages
    ---------
    Co-authored-by: Molly Gao <mg...@mgao-mn2.linkedin.biz>
---
 .../stages/IntermediateStateCalcStage.java         |  2 ++
 .../messaging/handling/HelixTaskExecutor.java      | 33 ++++++++++++------
 .../main/java/org/apache/helix/model/Message.java  | 37 ++++++++++++++++++--
 .../statemachine/StateModelFactory.java            | 39 ++++++++++++++++++++++
 .../messaging/handling/TestHelixTaskExecutor.java  | 34 ++++++++++++++++---
 5 files changed, 127 insertions(+), 18 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 835b19be5..d16d6df4c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -379,10 +379,12 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
 
         // Number of states required by StateModelDefinition are not satisfied, need recovery
         if (rebalanceType.equals(RebalanceType.RECOVERY_BALANCE)) {
+          message.setSTRebalanceType(Message.STRebalanceType.RECOVERY_REBALANCE);
           messagesForRecovery.add(message.getId());
           recoveryRebalance(resource, partition, throttleController, message, cache,
               messagesThrottledForRecovery, resourceMessageMap);
         } else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) {
+          message.setSTRebalanceType(Message.STRebalanceType.LOAD_REBALANCE);
           messagesForLoad.add(message.getId());
           loadRebalance(resource, partition, throttleController, message, cache,
               onlyDownwardLoadBalance, stateModelDef, messagesThrottledForLoad, resourceMessageMap);
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 4a96f111a..3e715e368 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -163,6 +163,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   /* Resources whose configuration for dedicate thread pool has been checked.*/
   final Set<String> _resourcesThreadpoolChecked;
   final Set<String> _transitionTypeThreadpoolChecked;
+  final Set<String> _msgInfoBasedThreadpoolChecked;
 
   // timer for schedule timeout tasks
   final Timer _timer;
@@ -193,6 +194,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
     _resourcesThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap<>());
     _transitionTypeThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    _msgInfoBasedThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
     _lock = new Object();
     _statusUpdateUtil = new StatusUpdateUtil();
@@ -290,6 +292,19 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         manager.getStateMachineEngine().getStateModelFactory(stateModelName, factoryName);
 
     Message.MessageInfo msgInfo = new Message.MessageInfo(message);
+    if (stateModelFactory != null) {
+      StateModelFactory.CustomizedExecutorService customizedExecutorService =
+          stateModelFactory.getExecutorService(msgInfo);
+      if (customizedExecutorService != null) {
+        String msgInfoBasedKey = msgInfo.getMessageIdentifier(customizedExecutorService.getBase());
+        if (msgInfoBasedKey != null) {
+          _msgInfoBasedThreadpoolChecked.add(msgInfoBasedKey);
+          _executorMap.put(msgInfoBasedKey, customizedExecutorService.getExecutorService());
+          return;
+        }
+      }
+    }
+
     String perStateTransitionTypeKey =
         msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE);
     if (perStateTransitionTypeKey != null && stateModelFactory != null && !_transitionTypeThreadpoolChecked.contains(
@@ -361,17 +376,13 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         executorService = _batchMessageExecutorService;
       } else {
         Message.MessageInfo msgInfo = new Message.MessageInfo(message);
-        String perResourceTypeKey =
-            msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE);
-        String perStateTransitionTypeKey =
-            msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE);
-        if (perStateTransitionTypeKey != null && _executorMap.containsKey(perStateTransitionTypeKey)) {
-          LOG.info(String.format("Find per state transition type thread pool for resource %s from %s to %s",
-              message.getResourceName(), message.getFromState(), message.getToState()));
-          executorService = _executorMap.get(perStateTransitionTypeKey);
-        } else if (_executorMap.containsKey(perResourceTypeKey)) {
-          LOG.info("Find per-resource thread pool with key: " + perResourceTypeKey);
-          executorService = _executorMap.get(perResourceTypeKey);
+        for (int i = Message.MessageInfo.MessageIdentifierBase.values().length - 1; i >= 0; i--) {
+          String msgIdentifer = msgInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.values()[i]);
+          if (msgIdentifer != null && _executorMap.containsKey(msgIdentifer)) {
+            LOG.info(String.format("Find customized threadpool for %s", msgIdentifer));
+            executorService = _executorMap.get(msgIdentifer);
+            break;
+          }
         }
       }
     }
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index b3a7c8884..acf075821 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -103,7 +103,8 @@ public class Message extends HelixProperty {
     RELAY_TIME,
     RELAY_FROM,
     EXPIRY_PERIOD,
-    SRC_CLUSTER
+    SRC_CLUSTER,
+    ST_REBALANCE_TYPE
   }
 
   /**
@@ -115,6 +116,11 @@ public class Message extends HelixProperty {
     UNPROCESSABLE // get exception when create handler
   }
 
+  public enum STRebalanceType {
+    LOAD_REBALANCE,
+    RECOVERY_REBALANCE
+  }
+
   // default expiry time period for a relay message.
   public static final long RELAY_MESSAGE_DEFAULT_EXPIRY = 5 * 1000; // 5 second
 
@@ -660,6 +666,15 @@ public class Message extends HelixProperty {
     }
   }
 
+  public void setSTRebalanceType(STRebalanceType stRebalanceType) {
+    setAttribute(Attributes.ST_REBALANCE_TYPE, stRebalanceType.name());
+  }
+
+  public STRebalanceType getSTRebalanceType() {
+    String rebalanceTypeStr = getAttribute(Attributes.ST_REBALANCE_TYPE);
+    return rebalanceTypeStr == null ? null : STRebalanceType.valueOf(rebalanceTypeStr);
+  }
+
   /**
    * Add or change a message attribute
    * @param attr {@link Attributes} attribute name
@@ -969,16 +984,24 @@ public class Message extends HelixProperty {
 
   /**
    * This class is for categorizing state transition messages based on certain properties, and generating
+   * an identifier string for each category
    */
   public static class MessageInfo {
     public String _msgType;
     public String _resourceName;
     public String _fromState;
     public String _toState;
+    public STRebalanceType _sTRebalanceType;
 
+    /**
+     * This class is used for defining what properties are used to categorize messages.
+     * Note that the bases should be arranged in a lower level -> higher level order,
+     * because we do backwards searching when looking for customized threadpool.
+     */
     public enum MessageIdentifierBase {
-      PER_RESOURCE,
-      PER_STATE_TRANSITION_TYPE
+      PER_RESOURCE, // L0: Most basic level, just include message type and resource name
+      PER_REBALANCE_TYPE, // L1: One level above PER_RESOURCE, concatenate rebalance type (load / recovery) after PER_RESOURCE identifier string
+      PER_STATE_TRANSITION_TYPE // L1: One level above PER_RESOURCE, concatenate state transition type (from state, to state) after PER_RESOURCE identifier string
     }
 
     public MessageInfo(Message message) {
@@ -986,6 +1009,7 @@ public class Message extends HelixProperty {
       _resourceName = message.getResourceName();
       _fromState = message.getFromState();
       _toState = message.getToState();
+      _sTRebalanceType = message.getSTRebalanceType();
     }
 
     public String getMessageIdentifier(MessageIdentifierBase basis) {
@@ -995,6 +1019,13 @@ public class Message extends HelixProperty {
       }
       String identifier = String.join(delimiter, _msgType, _resourceName);
       switch (basis) {
+        case PER_REBALANCE_TYPE:
+          if (_sTRebalanceType == null) {
+            return null;
+          }
+          identifier =
+              String.join(delimiter, identifier, _sTRebalanceType.name());
+          break;
         case PER_STATE_TRANSITION_TYPE:
           if (_fromState == null || _toState == null) {
             return null;
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
index eb4c372a8..69326906b 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.helix.messaging.handling.BatchMessageWrapper;
+import org.apache.helix.model.Message;
 
 public abstract class StateModelFactory<T extends StateModel> {
   /**
@@ -260,4 +261,42 @@ public abstract class StateModelFactory<T extends StateModel> {
   public ExecutorService getExecutorService(String resourceName, String fromState, String toState) {
     return null;
   }
+
+  public static class CustomizedExecutorService {
+    private final Message.MessageInfo.MessageIdentifierBase _base;
+    private final ExecutorService _executorService;
+
+    public CustomizedExecutorService(Message.MessageInfo.MessageIdentifierBase base, ExecutorService executorService) {
+      if (base == null || executorService == null) {
+        throw new IllegalArgumentException(
+            "MessageIdentifierBase or ExecutorService cannot be null to instantiate a CustomizedExecutorService object");
+      }
+      _base = base;
+      _executorService = executorService;
+    }
+
+    public Message.MessageInfo.MessageIdentifierBase getBase() {
+      return _base;
+    }
+
+    public ExecutorService getExecutorService() {
+      return _executorService;
+    }
+  }
+
+  /**
+   *Get thread pool to handle the given state transition message.
+   * If this method returns null, the threadpool returned from
+   * {@link StateModelFactory#getExecutorService(String resourceName, String fromState, String toState)} will be used;
+   * it this method returns null the threadpool returned from
+   * {@link StateModelFactory#getExecutorService(String resourceName)} will be used.
+   * If that method return null too, then the default shared threadpool will be used.
+   * This method may be called only once for each category of messages,
+   * it will NOT be called during each state transition.
+   * @param msgInfo contains information used to categorize messages to use different threadpools
+   * @return An object contains the MessageIdentifierBase and the assigned threadpool for the input message
+   */
+  public CustomizedExecutorService getExecutorService(Message.MessageInfo msgInfo) {
+    return null;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index d4b554978..5ce7c55ae 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -1127,6 +1127,8 @@ public class TestHelixTaskExecutor {
         new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
     ThreadPoolExecutor executor1 =
         new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+    ThreadPoolExecutor executor2 =
+        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
 
     class MockStateModelFactory_ResourceName
         extends StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel> {
@@ -1144,6 +1146,14 @@ public class TestHelixTaskExecutor {
       }
     }
 
+    class MockStateModelFactory_MsgInfo
+        extends StateModelFactory<OnlineOfflineStateModelFactory.OnlineOfflineStateModel> {
+      @Override
+      public CustomizedExecutorService getExecutorService(Message.MessageInfo msgInfo) {
+        return new CustomizedExecutorService(Message.MessageInfo.MessageIdentifierBase.PER_REBALANCE_TYPE, executor2);
+      }
+    }
+
     System.out.println("START " + TestHelper.getTestMethodName());
     String sessionId = UUID.randomUUID().toString();
     String resourceName = "testDB";
@@ -1162,11 +1172,11 @@ public class TestHelixTaskExecutor {
     when(manager.getClusterName()).thenReturn(TestHelper.getTestMethodName());
     StateModel stateModel = new MockMasterSlaveStateModel();
     NotificationContext context = new NotificationContext(manager);
-    HelixTaskExecutor executor = new HelixTaskExecutor();
     Message message = new Message(Message.MessageType.STATE_TRANSITION, msgId);
     message.setFromState(fromState);
     message.setToState(toState);
     message.setResourceName(resourceName);
+    message.setSTRebalanceType(Message.STRebalanceType.LOAD_REBALANCE);
     message.setStateModelDef(stateModelDef);
     message.setPartitionName("TestPartition");
     message.setTgtName("TgtInstance");
@@ -1174,8 +1184,8 @@ public class TestHelixTaskExecutor {
     message.setTgtSessionId(sessionId);
 
     // State transition type based
-    executor =
-        new HelixTaskExecutor(); // Re-initialize it because if the message exists in _taskMap, it won't be assigned again
+    HelixTaskExecutor executor =
+        new HelixTaskExecutor();
     StateModelFactory<? extends StateModel> factory = new MockStateModelFactory_STType();
     Mockito.doReturn(factory)
         .when(engine)
@@ -1189,7 +1199,7 @@ public class TestHelixTaskExecutor {
     System.out.println(TestHelper.getTestMethodName() + ": State transition based test passed.");
 
     // Resource name based
-    executor = new HelixTaskExecutor();
+    executor = new HelixTaskExecutor(); // Re-initialize it because if the message exists in _taskMap, it won't be assigned again
     factory = new MockStateModelFactory_ResourceName();
     Mockito.doReturn(factory)
         .when(engine)
@@ -1202,6 +1212,22 @@ public class TestHelixTaskExecutor {
       return executor0.getTaskCount() == 1;
     }, TestHelper.WAIT_DURATION));
     System.out.println(TestHelper.getTestMethodName() + ": Resource name based test passed.");
+
+    // Message Info based
+    executor = new HelixTaskExecutor();
+    factory = new MockStateModelFactory_MsgInfo();
+    handler =
+        new HelixStateTransitionHandler(factory, stateModel, message, context, new CurrentState(resourceName));
+    Mockito.doReturn(factory)
+        .when(engine)
+        .getStateModelFactory(stateModelDef, HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+    task = new HelixTask(message, context, handler, executor);
+    executor.scheduleTask(task);
+    Assert.assertTrue(TestHelper.verify(() -> {
+      return executor2.getTaskCount() == 1;
+    }, TestHelper.WAIT_DURATION));
+    System.out.println(TestHelper.getTestMethodName() + ": Message Info based test passed.");
+
     System.out.println("END " + TestHelper.getTestMethodName());
   }
 }