You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 17:59:37 UTC

[02/38] helix git commit: Support of client's customized threadpool for state-transition message handling.

Support of client's customized threadpool for state-transition message handling.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/eb333702
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/eb333702
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/eb333702

Branch: refs/heads/helix-0.6.x
Commit: eb33370289e2fee521a12f6bae901560bd599f57
Parents: 68e2fdc
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Apr 15 15:38:15 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Sun Feb 5 16:54:44 2017 -0800

----------------------------------------------------------------------
 .../messaging/handling/HelixTaskExecutor.java   | 72 ++++++++++++----
 .../participant/HelixStateMachineEngine.java    |  2 +
 .../helix/participant/StateMachineEngine.java   | 16 ++++
 .../statemachine/StateModelFactory.java         | 12 ++-
 .../handling/TestResourceThreadpoolSize.java    | 89 ++++++++++++++++++--
 5 files changed, 165 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/eb333702/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 59605e4..bc536fd 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -55,6 +55,8 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.monitoring.ParticipantMonitor;
 import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
 import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.log4j.Logger;
 
@@ -110,7 +112,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
   final ConcurrentHashMap<String, ExecutorService> _executorMap;
 
-  final Map<String, Integer> _resourceThreadpoolSizeMap;
+  /* Resources whose configuration for dedicate thread pool has been checked.*/
+  final Set<String> _resourcesThreadpoolChecked;
 
   // timer for schedule timeout tasks
   final Timer _timer;
@@ -120,7 +123,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
     _hdlrFtyRegistry = new ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem>();
     _executorMap = new ConcurrentHashMap<String, ExecutorService>();
-    _resourceThreadpoolSizeMap = new ConcurrentHashMap<String, Integer>();
+    _resourcesThreadpoolChecked =
+        Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
 
     _lock = new Object();
     _statusUpdateUtil = new StatusUpdateUtil();
@@ -174,10 +178,19 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     // start a thread which monitors the completions of task
   }
 
-  void checkResourceConfig(String resourceName, HelixManager manager) {
-    if (!_resourceThreadpoolSizeMap.containsKey(resourceName)) {
+  /** Dedicated Thread pool can be provided in configuration or by client.
+   *  This method is to check it and update the thread pool if necessary.
+   */
+  private void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) {
+    if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) {
+      return;
+    }
+
+    String resourceName = message.getResourceName();
+    if (!_resourcesThreadpoolChecked.contains(resourceName)) {
       int threadpoolSize = -1;
       ConfigAccessor configAccessor = manager.getConfigAccessor();
+      // Changes to this configuration on thread pool size will only take effect after the participant get restarted.
       if (configAccessor != null) {
         HelixConfigScope scope =
             new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
@@ -189,16 +202,39 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
             threadpoolSize = Integer.parseInt(threadpoolSizeStr);
           }
         } catch (Exception e) {
-          LOG.error("", e);
+          LOG.error(
+              "Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, e);
         }
       }
+      String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName;
       if (threadpoolSize > 0) {
-        String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName;
         _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize));
-        LOG.info("Added per resource threadpool for resource: " + resourceName + " with size: "
+        LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: "
             + threadpoolSize);
+      } else {
+        // if threadpool is not configured
+        // check whether client specifies customized threadpool.
+        String factoryName = message.getStateModelFactoryName();
+        String stateModelName = message.getStateModelDef();
+        if (factoryName == null) {
+          factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
+        }
+
+        StateModelFactory<? extends StateModel> stateModelFactory =
+            manager.getStateMachineEngine().getStateModelFactory(stateModelName, factoryName);
+        if (stateModelFactory != null) {
+          ExecutorService executor = stateModelFactory.getExecutorService(resourceName);
+          if (executor != null) {
+            _executorMap.put(key, executor);
+            LOG.info("Added client specified dedicate threadpool for resource: " + key);
+          }
+        } else {
+          LOG.error(String.format(
+              "Fail to get dedicate threadpool defined in stateModelFactory %s: using factoryName: %s for resource %s. No stateModelFactory was found!",
+              stateModelName, factoryName, resourceName));
+        }
       }
-      _resourceThreadpoolSizeMap.put(resourceName, threadpoolSize);
+      _resourcesThreadpoolChecked.add(resourceName);
     }
   }
 
@@ -270,9 +306,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     NotificationContext notificationContext = task.getNotificationContext();
 
     try {
-      if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) {
-        checkResourceConfig(message.getResourceName(), notificationContext.getManager());
-      }
+      // Check to see if dedicate thread pool for handling state transition messages is configured or provided.
+      updateStateTransitionMessageThreadPool(message, notificationContext.getManager());
 
       LOG.info("Scheduling message: " + taskId);
       // System.out.println("sched msg: " + message.getPartitionName() + "-"
@@ -351,13 +386,14 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
           _taskMap.remove(taskId);
           return true;
         } else {
-          _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "fail to cancel task: "
-              + taskId, notificationContext.getManager().getHelixDataAccessor());
+          _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
+              "fail to cancel task: " + taskId,
+              notificationContext.getManager().getHelixDataAccessor());
         }
       } else {
-        _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class, "fail to cancel task: "
-            + taskId + ", future not found", notificationContext.getManager()
-            .getHelixDataAccessor());
+        _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
+            "fail to cancel task: " + taskId + ", future not found",
+            notificationContext.getManager().getHelixDataAccessor());
       }
     }
     return false;
@@ -367,8 +403,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   public void finishTask(MessageTask task) {
     Message message = task.getMessage();
     String taskId = task.getTaskId();
-    LOG.info("message finished: " + taskId + ", took "
-        + (new Date().getTime() - message.getExecuteStartTimeStamp()));
+    LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message
+        .getExecuteStartTimeStamp()));
 
     synchronized (_lock) {
       if (_taskMap.containsKey(taskId)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/eb333702/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 3c11cdb..8143b2f 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -63,10 +63,12 @@ public class HelixStateMachineEngine implements StateMachineEngine {
     _stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>();
   }
 
+  @Override
   public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName) {
     return getStateModelFactory(stateModelName, HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
   }
 
+  @Override
   public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName,
       String factoryName) {
     if (!_stateModelFactoryMap.containsKey(stateModelName)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/eb333702/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
index d11b3cc..ab44e8e 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
@@ -67,4 +67,20 @@ public interface StateMachineEngine extends MessageHandlerFactory {
    */
   public boolean removeStateModelFactory(String stateModelDef,
       StateModelFactory<? extends StateModel> factory, String factoryName);
+
+  /**
+   * Get a default state model factory for a state model definition.
+   * @param stateModelName
+   * @return
+   */
+  public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName);
+
+  /**
+   * Get a default state model factory for a state model definition.
+   * @param stateModelName
+   * @param factoryName
+   * @return
+   */
+  public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName,
+      String factoryName);
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/eb333702/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
index 2878620..eb8dd7d 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.helix.messaging.handling.BatchMessageWrapper;
 
@@ -167,7 +168,6 @@ public abstract class StateModelFactory<T extends StateModel> {
 
   /**
    * get resource set
-   * @param resourceName
    * @return resource name set
    */
   public Set<String> getResourceSet() {
@@ -229,4 +229,14 @@ public abstract class StateModelFactory<T extends StateModel> {
     return _batchMsgWrapperMap.get(resourceName);
   }
 
+  /**
+   * Get the customized thread pool to handle all state transition messages for the given resource.
+   * If this method return null, Helix will use the shared thread pool to handle all messages.
+   *
+   * @param resourceName
+   * @return
+   */
+  public ExecutorService getExecutorService(String resourceName) {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/eb333702/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index c7f1276..20d9178 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -19,15 +19,21 @@ package org.apache.helix.messaging.handling;
  * under the License.
  */
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.helix.ConfigAccessor;
-import org.apache.helix.model.ConfigScope;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.mock.participant.DummyProcess;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.AutoRebalanceModeISBuilder;
 import org.apache.helix.HelixManager;
 import org.apache.helix.integration.ZkStandAloneCMTestBase;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -35,11 +41,7 @@ import org.testng.annotations.Test;
 public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
   @Test
   public void TestThreadPoolSizeConfig() {
-    HelixManager manager = _participants[0];
-    ConfigAccessor accessor = manager.getConfigAccessor();
-    ConfigScope scope =
-        new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource("NextDB").build();
-    accessor.set(scope, HelixTaskExecutor.MAX_THREADS, "" + 12);
+    setResourceThreadPoolSize("NextDB", 12);
 
     _setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
@@ -63,4 +65,77 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
     }
     Assert.assertEquals(taskcount, 64 * 4);
   }
+
+  @Test public void TestCustomizedResourceThreadPool() {
+    int customizedPoolSize = 7;
+    int configuredPoolSize = 9;
+    for (MockParticipantManager participant : _participants) {
+      participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
+          new TestOnlineOfflineStateModelFactory(customizedPoolSize), "TestFactory");
+    }
+
+    // add db with default thread pool
+    _setupTool.addResourceToCluster(CLUSTER_NAME, "TestDB1", 64, STATE_MODEL);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB1", 3);
+
+    // add db with customized thread pool
+    IdealState idealState = new AutoRebalanceModeISBuilder("TestDB2").setStateModel("OnlineOffline")
+        .setStateModelFactoryName("TestFactory").setNumPartitions(10).setNumReplica(1).build();
+    _setupTool.getClusterManagementTool().addResource(CLUSTER_NAME, "TestDB2", idealState);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB2", 1);
+
+    // add db with configured pool size
+    idealState = new AutoRebalanceModeISBuilder("TestDB3").setStateModel("OnlineOffline")
+        .setStateModelFactoryName("TestFactory").setNumPartitions(10).setNumReplica(1).build();
+    _setupTool.getClusterManagementTool().addResource(CLUSTER_NAME, "TestDB3", idealState);
+    setResourceThreadPoolSize("TestDB3", configuredPoolSize);
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB3", 1);
+
+    boolean result = ClusterStateVerifier.verifyByPolling(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    for (int i = 0; i < NODE_NR; i++) {
+      DefaultMessagingService svc =
+          (DefaultMessagingService) (_participants[i].getMessagingService());
+      HelixTaskExecutor helixExecutor = svc.getExecutor();
+      ThreadPoolExecutor executor = (ThreadPoolExecutor) (helixExecutor._executorMap
+          .get(MessageType.STATE_TRANSITION + "." + "TestDB1"));
+      Assert.assertNull(executor);
+
+      executor = (ThreadPoolExecutor) (helixExecutor._executorMap
+          .get(MessageType.STATE_TRANSITION + "." + "TestDB2"));
+      Assert.assertEquals(customizedPoolSize, executor.getMaximumPoolSize());
+
+      executor = (ThreadPoolExecutor) (helixExecutor._executorMap
+          .get(MessageType.STATE_TRANSITION + "." + "TestDB3"));
+      Assert.assertEquals(configuredPoolSize, executor.getMaximumPoolSize());
+    }
+  }
+
+  private void setResourceThreadPoolSize(String resourceName, int threadPoolSize) {
+    HelixManager manager = _participants[0];
+    ConfigAccessor accessor = manager.getConfigAccessor();
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
+            .forCluster(manager.getClusterName()).forResource(resourceName).build();
+    accessor.set(scope, HelixTaskExecutor.MAX_THREADS, "" + threadPoolSize);
+  }
+
+  public static class TestOnlineOfflineStateModelFactory
+      extends DummyProcess.DummyOnlineOfflineStateModelFactory {
+    int _threadPoolSize;
+    ExecutorService _threadPoolExecutor;
+
+    public TestOnlineOfflineStateModelFactory(int threadPoolSize) {
+      super(0);
+      if (threadPoolSize > 0) {
+        _threadPoolExecutor = Executors.newFixedThreadPool(threadPoolSize);
+      }
+    }
+
+    @Override public ExecutorService getExecutorService(String resourceName) {
+      return _threadPoolExecutor;
+    }
+  }
 }