You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/04/05 22:17:15 UTC
helix git commit: Add Test for Batch Message ThreadPool
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x ae3985704 -> 22bee7298
Add Test for Batch Message ThreadPool
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/22bee729
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/22bee729
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/22bee729
Branch: refs/heads/helix-0.6.x
Commit: 22bee7298406136433d83a44becc8a625542b632
Parents: ae39857
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Apr 3 17:25:33 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Apr 5 14:53:42 2017 -0700
----------------------------------------------------------------------
.../messaging/handling/HelixTaskExecutor.java | 4 +-
.../handling/TestResourceThreadpoolSize.java | 48 +++++++++++++++++---
2 files changed, 44 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/22bee729/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 8d3fea1..8e686ee 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -119,11 +119,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
final ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem> _hdlrFtyRegistry;
final ConcurrentHashMap<String, ExecutorService> _executorMap;
-
+
/**
* separate executor for executing batch messages
*/
- private final ExecutorService _batchMessageExecutorService;
+ final ExecutorService _batchMessageExecutorService;
/* Resources whose configuration for dedicate thread pool has been checked.*/
http://git-wip-us.apache.org/repos/asf/helix/blob/22bee729/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index 33f1c55..81ba71b 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -24,16 +24,17 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.mock.participant.DummyProcess;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.integration.ZkStandAloneCMTestBase;
-import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.builder.FullAutoModeISBuilder;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -71,7 +72,7 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
int configuredPoolSize = 9;
for (MockParticipantManager participant : _participants) {
participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
- new TestOnlineOfflineStateModelFactory(customizedPoolSize), "TestFactory");
+ new TestOnlineOfflineStateModelFactory(customizedPoolSize, 0), "TestFactory");
}
// add db with default thread pool
@@ -113,6 +114,41 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
}
}
+ @Test
+ public void testBatchMessageThreadPoolSize() throws InterruptedException {
+ int customizedPoolSize = 5;
+ _participants[0].getStateMachineEngine().registerStateModelFactory("OnlineOffline",
+ new TestOnlineOfflineStateModelFactory(customizedPoolSize, 2000), "TestFactory");
+ for (int i = 1; i < _participants.length; i++) {
+ _participants[i].syncStop();
+ }
+ Thread.sleep(2000L);
+
+ // Add 10 dbs with batch message enabled. Each db has 10 partitions.
+ // So it will have 10 batch messages and each batch message has 10 sub messages.
+ int numberOfDbs = 10;
+ for (int i = 0; i < numberOfDbs; i++) {
+ String dbName = "TestDBABatch" + i;
+ IdealState idealState = new FullAutoModeISBuilder(dbName).setStateModel("OnlineOffline")
+ .setStateModelFactoryName("TestFactory").setNumPartitions(10).setNumReplica(1).build();
+ idealState.setBatchMessageMode(true);
+ _setupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName, idealState);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, 1);
+ }
+ Thread.sleep(2000L);
+
+ DefaultMessagingService svc =
+ (DefaultMessagingService) (_participants[0].getMessagingService());
+ HelixTaskExecutor helixExecutor = svc.getExecutor();
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) (helixExecutor._batchMessageExecutorService);
+ Assert.assertNotNull(executor);
+ Assert.assertTrue(executor.getPoolSize() >= numberOfDbs);
+
+ BestPossibleExternalViewVerifier verifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ Assert.assertTrue(verifier.verify());
+ }
+
private void setResourceThreadPoolSize(String resourceName, int threadPoolSize) {
HelixManager manager = _participants[0];
ConfigAccessor accessor = manager.getConfigAccessor();
@@ -127,8 +163,8 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
int _threadPoolSize;
ExecutorService _threadPoolExecutor;
- public TestOnlineOfflineStateModelFactory(int threadPoolSize) {
- super(0);
+ public TestOnlineOfflineStateModelFactory(int threadPoolSize, int delay) {
+ super(delay);
if (threadPoolSize > 0) {
_threadPoolExecutor = Executors.newFixedThreadPool(threadPoolSize);
}