You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/04/05 22:17:15 UTC

helix git commit: Add Test for Batch Message ThreadPool

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x ae3985704 -> 22bee7298


Add Test for Batch Message ThreadPool


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/22bee729
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/22bee729
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/22bee729

Branch: refs/heads/helix-0.6.x
Commit: 22bee7298406136433d83a44becc8a625542b632
Parents: ae39857
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Apr 3 17:25:33 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Apr 5 14:53:42 2017 -0700

----------------------------------------------------------------------
 .../messaging/handling/HelixTaskExecutor.java   |  4 +-
 .../handling/TestResourceThreadpoolSize.java    | 48 +++++++++++++++++---
 2 files changed, 44 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/22bee729/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 8d3fea1..8e686ee 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -119,11 +119,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   final ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem> _hdlrFtyRegistry;
 
   final ConcurrentHashMap<String, ExecutorService> _executorMap;
-  
+
   /**
    * separate executor for executing batch messages
    */
-  private final ExecutorService _batchMessageExecutorService;
+  final ExecutorService _batchMessageExecutorService;
 
 
   /* Resources whose configuration for dedicate thread pool has been checked.*/

http://git-wip-us.apache.org/repos/asf/helix/blob/22bee729/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index 33f1c55..81ba71b 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -24,16 +24,17 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.integration.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.mock.participant.DummyProcess;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.integration.ZkStandAloneCMTestBase;
-import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.builder.FullAutoModeISBuilder;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -71,7 +72,7 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
     int configuredPoolSize = 9;
     for (MockParticipantManager participant : _participants) {
       participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
-          new TestOnlineOfflineStateModelFactory(customizedPoolSize), "TestFactory");
+          new TestOnlineOfflineStateModelFactory(customizedPoolSize, 0), "TestFactory");
     }
 
     // add db with default thread pool
@@ -113,6 +114,41 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
     }
   }
 
+  @Test
+  public void testBatchMessageThreadPoolSize() throws InterruptedException {
+    int customizedPoolSize = 5;
+    _participants[0].getStateMachineEngine().registerStateModelFactory("OnlineOffline",
+        new TestOnlineOfflineStateModelFactory(customizedPoolSize, 2000), "TestFactory");
+    for (int i = 1; i < _participants.length; i++) {
+      _participants[i].syncStop();
+    }
+    Thread.sleep(2000L);
+
+    // Add 10 dbs with batch message enabled. Each db has 10 partitions.
+    // So it will have 10 batch messages and each batch message has 10 sub messages.
+    int numberOfDbs = 10;
+    for (int i = 0; i < numberOfDbs; i++) {
+      String dbName = "TestDBABatch" + i;
+      IdealState idealState = new FullAutoModeISBuilder(dbName).setStateModel("OnlineOffline")
+          .setStateModelFactoryName("TestFactory").setNumPartitions(10).setNumReplica(1).build();
+      idealState.setBatchMessageMode(true);
+      _setupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName, idealState);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, 1);
+    }
+    Thread.sleep(2000L);
+
+    DefaultMessagingService svc =
+        (DefaultMessagingService) (_participants[0].getMessagingService());
+    HelixTaskExecutor helixExecutor = svc.getExecutor();
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) (helixExecutor._batchMessageExecutorService);
+    Assert.assertNotNull(executor);
+    Assert.assertTrue(executor.getPoolSize() >= numberOfDbs);
+
+    BestPossibleExternalViewVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(verifier.verify());
+  }
+
   private void setResourceThreadPoolSize(String resourceName, int threadPoolSize) {
     HelixManager manager = _participants[0];
     ConfigAccessor accessor = manager.getConfigAccessor();
@@ -127,8 +163,8 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
     int _threadPoolSize;
     ExecutorService _threadPoolExecutor;
 
-    public TestOnlineOfflineStateModelFactory(int threadPoolSize) {
-      super(0);
+    public TestOnlineOfflineStateModelFactory(int threadPoolSize, int delay) {
+      super(delay);
       if (threadPoolSize > 0) {
         _threadPoolExecutor = Executors.newFixedThreadPool(threadPoolSize);
       }