You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/06/22 22:57:54 UTC
[35/50] [abbrv] helix git commit: [Helix-656] Support customize batch
state transition thread pool
[Helix-656] Support customize batch state transition thread pool
To better support batch message handling, we shall make batch state transition thread pool configurable.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7fc4a8af
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7fc4a8af
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7fc4a8af
Branch: refs/heads/master
Commit: 7fc4a8af8a3e8c4acc2ad6c0b4f7f912cea0369f
Parents: 8d409fc
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed May 10 12:18:41 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu May 11 17:31:54 2017 -0700
----------------------------------------------------------------------
.../messaging/handling/HelixTaskExecutor.java | 19 +++++++++-
.../org/apache/helix/model/ClusterConfig.java | 22 ++++++++++-
.../handling/TestConfigThreadpoolSize.java | 40 +++++++++++++++++++-
3 files changed, 76 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7fc4a8af/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 8e686ee..3f6a43d 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -48,6 +48,7 @@ import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
@@ -123,11 +124,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
/**
* separate executor for executing batch messages
*/
- final ExecutorService _batchMessageExecutorService;
+ ExecutorService _batchMessageExecutorService;
/* Resources whose configuration for dedicate thread pool has been checked.*/
final Set<String> _resourcesThreadpoolChecked;
+ boolean _batchMessageThreadpoolChecked;
// timer for schedule timeout tasks
final Timer _timer;
@@ -142,7 +144,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
_hdlrFtyRegistry = new ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem>();
_executorMap = new ConcurrentHashMap<String, ExecutorService>();
- _batchMessageExecutorService = Executors.newCachedThreadPool();
+ _batchMessageExecutorService = Executors.newFixedThreadPool(DEFAULT_PARALLEL_TASKS);
+ _batchMessageThreadpoolChecked = false;
_resourcesThreadpoolChecked =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
@@ -210,6 +213,18 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
return;
}
+ if (!_batchMessageThreadpoolChecked) {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
+ if (clusterConfig != null && clusterConfig.getBatchStateTransitionMaxThreads() > 0) {
+ LOG.info("Customize batch message thread pool with size : " + clusterConfig
+ .getBatchStateTransitionMaxThreads());
+ _batchMessageExecutorService =
+ Executors.newFixedThreadPool(clusterConfig.getBatchStateTransitionMaxThreads());
+ }
+ _batchMessageThreadpoolChecked = true;
+ }
+
String resourceName = message.getResourceName();
if (!_resourcesThreadpoolChecked.contains(resourceName)) {
int threadpoolSize = -1;
http://git-wip-us.apache.org/repos/asf/helix/blob/7fc4a8af/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 23d66a4..2be7ee1 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -35,7 +35,8 @@ public class ClusterConfig extends HelixProperty {
PERSIST_BEST_POSSIBLE_ASSIGNMENT,
FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the replicas from same partition.
DELAY_REBALANCE_DISABLED, // enabled the delayed rebalaning in case node goes offline.
- DELAY_REBALANCE_TIME // delayed time in ms that the delay time Helix should hold until rebalancing.
+ DELAY_REBALANCE_TIME, // delayed time in ms that the delay time Helix should hold until rebalancing.
+ BATCH_STATE_TRANSITION_MAX_THREADS
}
/**
@@ -83,6 +84,25 @@ public class ClusterConfig extends HelixProperty {
return _record.getBooleanField(ClusterConfigProperty.DELAY_REBALANCE_DISABLED.name(), false);
}
+ /**
+ * Set the customized batch message thread pool size
+ *
+ * @return
+ */
+ public void setBatchStateTransitionMaxThreads(int maxThreads) {
+ _record
+ .setIntField(ClusterConfigProperty.BATCH_STATE_TRANSITION_MAX_THREADS.name(), maxThreads);
+ }
+
+ /**
+ * Get the customized batch message thread pool size
+ *
+ * @return
+ */
+ public int getBatchStateTransitionMaxThreads() {
+ return _record.getIntField(ClusterConfigProperty.BATCH_STATE_TRANSITION_MAX_THREADS.name(), -1);
+ }
+
@Override
public boolean equals(Object obj) {
if (obj instanceof ClusterConfig) {
http://git-wip-us.apache.org/repos/asf/helix/blob/7fc4a8af/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
index c018f4c..385d761 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
@@ -23,13 +23,16 @@ import java.util.HashSet;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.helix.ConfigAccessor;
-import org.apache.helix.model.ConfigScope;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.Message;
+import org.apache.helix.model.builder.ConfigScopeBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -119,4 +122,37 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
Assert.assertEquals(HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, executor2.getMaximumPoolSize());
}
}
+
+ @Test
+ public void testBatchMessageStateTransitionThreadPoolSize() throws InterruptedException {
+ int customizedThreads = 123;
+ for (MockParticipantManager participantManager : _participants) {
+ participantManager.syncStop();
+ }
+
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
+ clusterConfig.setBatchStateTransitionMaxThreads(customizedThreads);
+ accessor.setProperty(accessor.keyBuilder().clusterConfig(), clusterConfig);
+
+ // Since old participants already checked the threadpool, shutdown all of others
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, "TestParticipant");
+ MockParticipantManager newParticipant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, "TestParticipant");
+ newParticipant.syncStart();
+
+ // Let messsage trigger update thread pool
+ String dbName = "TestDBSubMessageThreadPool";
+ _setupTool.addResourceToCluster(CLUSTER_NAME, dbName, 5, "OnlineOffline");
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, 1);
+
+ Thread.sleep(1000);
+
+ DefaultMessagingService svc = (DefaultMessagingService) (newParticipant.getMessagingService());
+ HelixTaskExecutor helixExecutor = svc.getExecutor();
+ Assert.assertEquals(
+ ((ThreadPoolExecutor) helixExecutor._batchMessageExecutorService).getMaximumPoolSize(),
+ customizedThreads);
+
+ }
}