You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 17:59:37 UTC
[02/38] helix git commit: Support of client's customized threadpool
for state-transition message handling.
Support of client's customized threadpool for state-transition message handling.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/eb333702
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/eb333702
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/eb333702
Branch: refs/heads/helix-0.6.x
Commit: eb33370289e2fee521a12f6bae901560bd599f57
Parents: 68e2fdc
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Apr 15 15:38:15 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Sun Feb 5 16:54:44 2017 -0800
----------------------------------------------------------------------
.../messaging/handling/HelixTaskExecutor.java | 72 ++++++++++++----
.../participant/HelixStateMachineEngine.java | 2 +
.../helix/participant/StateMachineEngine.java | 16 ++++
.../statemachine/StateModelFactory.java | 12 ++-
.../handling/TestResourceThreadpoolSize.java | 89 ++++++++++++++++++--
5 files changed, 165 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/eb333702/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 59605e4..bc536fd 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -55,6 +55,8 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.monitoring.ParticipantMonitor;
import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.log4j.Logger;
@@ -110,7 +112,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
final ConcurrentHashMap<String, ExecutorService> _executorMap;
- final Map<String, Integer> _resourceThreadpoolSizeMap;
+ /* Resources whose configuration for dedicate thread pool has been checked.*/
+ final Set<String> _resourcesThreadpoolChecked;
// timer for schedule timeout tasks
final Timer _timer;
@@ -120,7 +123,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
_hdlrFtyRegistry = new ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem>();
_executorMap = new ConcurrentHashMap<String, ExecutorService>();
- _resourceThreadpoolSizeMap = new ConcurrentHashMap<String, Integer>();
+ _resourcesThreadpoolChecked =
+ Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
_lock = new Object();
_statusUpdateUtil = new StatusUpdateUtil();
@@ -174,10 +178,19 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
// start a thread which monitors the completions of task
}
- void checkResourceConfig(String resourceName, HelixManager manager) {
- if (!_resourceThreadpoolSizeMap.containsKey(resourceName)) {
+ /** Dedicated Thread pool can be provided in configuration or by client.
+ * This method is to check it and update the thread pool if necessary.
+ */
+ private void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) {
+ if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) {
+ return;
+ }
+
+ String resourceName = message.getResourceName();
+ if (!_resourcesThreadpoolChecked.contains(resourceName)) {
int threadpoolSize = -1;
ConfigAccessor configAccessor = manager.getConfigAccessor();
+ // Changes to this configuration on thread pool size will only take effect after the participant get restarted.
if (configAccessor != null) {
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
@@ -189,16 +202,39 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
threadpoolSize = Integer.parseInt(threadpoolSizeStr);
}
} catch (Exception e) {
- LOG.error("", e);
+ LOG.error(
+ "Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, e);
}
}
+ String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName;
if (threadpoolSize > 0) {
- String key = MessageType.STATE_TRANSITION.toString() + "." + resourceName;
_executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize));
- LOG.info("Added per resource threadpool for resource: " + resourceName + " with size: "
+ LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: "
+ threadpoolSize);
+ } else {
+ // if threadpool is not configured
+ // check whether client specifies customized threadpool.
+ String factoryName = message.getStateModelFactoryName();
+ String stateModelName = message.getStateModelDef();
+ if (factoryName == null) {
+ factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
+ }
+
+ StateModelFactory<? extends StateModel> stateModelFactory =
+ manager.getStateMachineEngine().getStateModelFactory(stateModelName, factoryName);
+ if (stateModelFactory != null) {
+ ExecutorService executor = stateModelFactory.getExecutorService(resourceName);
+ if (executor != null) {
+ _executorMap.put(key, executor);
+ LOG.info("Added client specified dedicate threadpool for resource: " + key);
+ }
+ } else {
+ LOG.error(String.format(
+ "Fail to get dedicate threadpool defined in stateModelFactory %s: using factoryName: %s for resource %s. No stateModelFactory was found!",
+ stateModelName, factoryName, resourceName));
+ }
}
- _resourceThreadpoolSizeMap.put(resourceName, threadpoolSize);
+ _resourcesThreadpoolChecked.add(resourceName);
}
}
@@ -270,9 +306,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
NotificationContext notificationContext = task.getNotificationContext();
try {
- if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) {
- checkResourceConfig(message.getResourceName(), notificationContext.getManager());
- }
+ // Check to see if dedicate thread pool for handling state transition messages is configured or provided.
+ updateStateTransitionMessageThreadPool(message, notificationContext.getManager());
LOG.info("Scheduling message: " + taskId);
// System.out.println("sched msg: " + message.getPartitionName() + "-"
@@ -351,13 +386,14 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
_taskMap.remove(taskId);
return true;
} else {
- _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "fail to cancel task: "
- + taskId, notificationContext.getManager().getHelixDataAccessor());
+ _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
+ "fail to cancel task: " + taskId,
+ notificationContext.getManager().getHelixDataAccessor());
}
} else {
- _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class, "fail to cancel task: "
- + taskId + ", future not found", notificationContext.getManager()
- .getHelixDataAccessor());
+ _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
+ "fail to cancel task: " + taskId + ", future not found",
+ notificationContext.getManager().getHelixDataAccessor());
}
}
return false;
@@ -367,8 +403,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
public void finishTask(MessageTask task) {
Message message = task.getMessage();
String taskId = task.getTaskId();
- LOG.info("message finished: " + taskId + ", took "
- + (new Date().getTime() - message.getExecuteStartTimeStamp()));
+ LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message
+ .getExecuteStartTimeStamp()));
synchronized (_lock) {
if (_taskMap.containsKey(taskId)) {
http://git-wip-us.apache.org/repos/asf/helix/blob/eb333702/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 3c11cdb..8143b2f 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -63,10 +63,12 @@ public class HelixStateMachineEngine implements StateMachineEngine {
_stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>();
}
+ @Override
public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName) {
return getStateModelFactory(stateModelName, HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
}
+ @Override
public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName,
String factoryName) {
if (!_stateModelFactoryMap.containsKey(stateModelName)) {
http://git-wip-us.apache.org/repos/asf/helix/blob/eb333702/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
index d11b3cc..ab44e8e 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/StateMachineEngine.java
@@ -67,4 +67,20 @@ public interface StateMachineEngine extends MessageHandlerFactory {
*/
public boolean removeStateModelFactory(String stateModelDef,
StateModelFactory<? extends StateModel> factory, String factoryName);
+
+ /**
+ * Get a default state model factory for a state model definition.
+ * @param stateModelName
+ * @return
+ */
+ public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName);
+
+ /**
+ * Get a default state model factory for a state model definition.
+ * @param stateModelName
+ * @param factoryName
+ * @return
+ */
+ public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName,
+ String factoryName);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/eb333702/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
index 2878620..eb8dd7d 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import org.apache.helix.messaging.handling.BatchMessageWrapper;
@@ -167,7 +168,6 @@ public abstract class StateModelFactory<T extends StateModel> {
/**
* get resource set
- * @param resourceName
* @return resource name set
*/
public Set<String> getResourceSet() {
@@ -229,4 +229,14 @@ public abstract class StateModelFactory<T extends StateModel> {
return _batchMsgWrapperMap.get(resourceName);
}
+ /**
+ * Get the customized thread pool to handle all state transition messages for the given resource.
+ * If this method return null, Helix will use the shared thread pool to handle all messages.
+ *
+ * @param resourceName
+ * @return
+ */
+ public ExecutorService getExecutorService(String resourceName) {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/eb333702/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index c7f1276..20d9178 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -19,15 +19,21 @@ package org.apache.helix.messaging.handling;
* under the License.
*/
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.helix.ConfigAccessor;
-import org.apache.helix.model.ConfigScope;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.mock.participant.DummyProcess;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.builder.AutoRebalanceModeISBuilder;
import org.apache.helix.HelixManager;
import org.apache.helix.integration.ZkStandAloneCMTestBase;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -35,11 +41,7 @@ import org.testng.annotations.Test;
public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
@Test
public void TestThreadPoolSizeConfig() {
- HelixManager manager = _participants[0];
- ConfigAccessor accessor = manager.getConfigAccessor();
- ConfigScope scope =
- new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource("NextDB").build();
- accessor.set(scope, HelixTaskExecutor.MAX_THREADS, "" + 12);
+ setResourceThreadPoolSize("NextDB", 12);
_setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL);
_setupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3);
@@ -63,4 +65,77 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
}
Assert.assertEquals(taskcount, 64 * 4);
}
+
+ @Test public void TestCustomizedResourceThreadPool() {
+ int customizedPoolSize = 7;
+ int configuredPoolSize = 9;
+ for (MockParticipantManager participant : _participants) {
+ participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
+ new TestOnlineOfflineStateModelFactory(customizedPoolSize), "TestFactory");
+ }
+
+ // add db with default thread pool
+ _setupTool.addResourceToCluster(CLUSTER_NAME, "TestDB1", 64, STATE_MODEL);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB1", 3);
+
+ // add db with customized thread pool
+ IdealState idealState = new AutoRebalanceModeISBuilder("TestDB2").setStateModel("OnlineOffline")
+ .setStateModelFactoryName("TestFactory").setNumPartitions(10).setNumReplica(1).build();
+ _setupTool.getClusterManagementTool().addResource(CLUSTER_NAME, "TestDB2", idealState);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB2", 1);
+
+ // add db with configured pool size
+ idealState = new AutoRebalanceModeISBuilder("TestDB3").setStateModel("OnlineOffline")
+ .setStateModelFactoryName("TestFactory").setNumPartitions(10).setNumReplica(1).build();
+ _setupTool.getClusterManagementTool().addResource(CLUSTER_NAME, "TestDB3", idealState);
+ setResourceThreadPoolSize("TestDB3", configuredPoolSize);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB3", 1);
+
+ boolean result = ClusterStateVerifier.verifyByPolling(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ for (int i = 0; i < NODE_NR; i++) {
+ DefaultMessagingService svc =
+ (DefaultMessagingService) (_participants[i].getMessagingService());
+ HelixTaskExecutor helixExecutor = svc.getExecutor();
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) (helixExecutor._executorMap
+ .get(MessageType.STATE_TRANSITION + "." + "TestDB1"));
+ Assert.assertNull(executor);
+
+ executor = (ThreadPoolExecutor) (helixExecutor._executorMap
+ .get(MessageType.STATE_TRANSITION + "." + "TestDB2"));
+ Assert.assertEquals(customizedPoolSize, executor.getMaximumPoolSize());
+
+ executor = (ThreadPoolExecutor) (helixExecutor._executorMap
+ .get(MessageType.STATE_TRANSITION + "." + "TestDB3"));
+ Assert.assertEquals(configuredPoolSize, executor.getMaximumPoolSize());
+ }
+ }
+
+ private void setResourceThreadPoolSize(String resourceName, int threadPoolSize) {
+ HelixManager manager = _participants[0];
+ ConfigAccessor accessor = manager.getConfigAccessor();
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
+ .forCluster(manager.getClusterName()).forResource(resourceName).build();
+ accessor.set(scope, HelixTaskExecutor.MAX_THREADS, "" + threadPoolSize);
+ }
+
+ public static class TestOnlineOfflineStateModelFactory
+ extends DummyProcess.DummyOnlineOfflineStateModelFactory {
+ int _threadPoolSize;
+ ExecutorService _threadPoolExecutor;
+
+ public TestOnlineOfflineStateModelFactory(int threadPoolSize) {
+ super(0);
+ if (threadPoolSize > 0) {
+ _threadPoolExecutor = Executors.newFixedThreadPool(threadPoolSize);
+ }
+ }
+
+ @Override public ExecutorService getExecutorService(String resourceName) {
+ return _threadPoolExecutor;
+ }
+ }
}