You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/06/22 22:57:54 UTC

[35/50] [abbrv] helix git commit: [Helix-656] Support customize batch state transition thread pool

[Helix-656] Support customize batch state transition thread pool

To better support batch message handling, we shall make batch state transition thread pool configurable.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7fc4a8af
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7fc4a8af
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7fc4a8af

Branch: refs/heads/master
Commit: 7fc4a8af8a3e8c4acc2ad6c0b4f7f912cea0369f
Parents: 8d409fc
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed May 10 12:18:41 2017 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu May 11 17:31:54 2017 -0700

----------------------------------------------------------------------
 .../messaging/handling/HelixTaskExecutor.java   | 19 +++++++++-
 .../org/apache/helix/model/ClusterConfig.java   | 22 ++++++++++-
 .../handling/TestConfigThreadpoolSize.java      | 40 +++++++++++++++++++-
 3 files changed, 76 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7fc4a8af/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 8e686ee..3f6a43d 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -48,6 +48,7 @@ import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
@@ -123,11 +124,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   /**
    * separate executor for executing batch messages
    */
-  final ExecutorService _batchMessageExecutorService;
+  ExecutorService _batchMessageExecutorService;
 
 
   /* Resources whose configuration for dedicate thread pool has been checked.*/
   final Set<String> _resourcesThreadpoolChecked;
+  boolean _batchMessageThreadpoolChecked;
 
   // timer for schedule timeout tasks
   final Timer _timer;
@@ -142,7 +144,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
 
     _hdlrFtyRegistry = new ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem>();
     _executorMap = new ConcurrentHashMap<String, ExecutorService>();
-    _batchMessageExecutorService = Executors.newCachedThreadPool();
+    _batchMessageExecutorService = Executors.newFixedThreadPool(DEFAULT_PARALLEL_TASKS);
+    _batchMessageThreadpoolChecked = false;
     _resourcesThreadpoolChecked =
         Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
 
@@ -210,6 +213,18 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       return;
     }
 
+    if (!_batchMessageThreadpoolChecked) {
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
+      if (clusterConfig != null && clusterConfig.getBatchStateTransitionMaxThreads() > 0) {
+        LOG.info("Customize batch message thread pool with size : " + clusterConfig
+            .getBatchStateTransitionMaxThreads());
+        _batchMessageExecutorService =
+            Executors.newFixedThreadPool(clusterConfig.getBatchStateTransitionMaxThreads());
+      }
+      _batchMessageThreadpoolChecked = true;
+    }
+
     String resourceName = message.getResourceName();
     if (!_resourcesThreadpoolChecked.contains(resourceName)) {
       int threadpoolSize = -1;

http://git-wip-us.apache.org/repos/asf/helix/blob/7fc4a8af/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 23d66a4..2be7ee1 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -35,7 +35,8 @@ public class ClusterConfig extends HelixProperty {
     PERSIST_BEST_POSSIBLE_ASSIGNMENT,
     FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the replicas from same partition.
     DELAY_REBALANCE_DISABLED,  // enabled the delayed rebalaning in case node goes offline.
-    DELAY_REBALANCE_TIME     // delayed time in ms that the delay time Helix should hold until rebalancing.
+    DELAY_REBALANCE_TIME,     // delayed time in ms that the delay time Helix should hold until rebalancing.
+    BATCH_STATE_TRANSITION_MAX_THREADS
   }
 
   /**
@@ -83,6 +84,25 @@ public class ClusterConfig extends HelixProperty {
     return _record.getBooleanField(ClusterConfigProperty.DELAY_REBALANCE_DISABLED.name(), false);
   }
 
+  /**
+   * Set the customized batch message thread pool size
+   *
+   * @return
+   */
+  public void setBatchStateTransitionMaxThreads(int maxThreads) {
+    _record
+        .setIntField(ClusterConfigProperty.BATCH_STATE_TRANSITION_MAX_THREADS.name(), maxThreads);
+  }
+
+  /**
+   * Get the customized batch message thread pool size
+   *
+   * @return
+   */
+  public int getBatchStateTransitionMaxThreads() {
+    return _record.getIntField(ClusterConfigProperty.BATCH_STATE_TRANSITION_MAX_THREADS.name(), -1);
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof ClusterConfig) {

http://git-wip-us.apache.org/repos/asf/helix/blob/7fc4a8af/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
index c018f4c..385d761 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestConfigThreadpoolSize.java
@@ -23,13 +23,16 @@ import java.util.HashSet;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.helix.ConfigAccessor;
-import org.apache.helix.model.ConfigScope;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.integration.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -119,4 +122,37 @@ public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase {
       Assert.assertEquals(HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, executor2.getMaximumPoolSize());
     }
   }
+
+  @Test
+  public void testBatchMessageStateTransitionThreadPoolSize() throws InterruptedException {
+    int customizedThreads = 123;
+    for (MockParticipantManager participantManager : _participants) {
+      participantManager.syncStop();
+    }
+
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
+    clusterConfig.setBatchStateTransitionMaxThreads(customizedThreads);
+    accessor.setProperty(accessor.keyBuilder().clusterConfig(), clusterConfig);
+
+    // Since old participants already checked the threadpool, shutdown all of others
+    _setupTool.addInstanceToCluster(CLUSTER_NAME, "TestParticipant");
+    MockParticipantManager newParticipant =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, "TestParticipant");
+    newParticipant.syncStart();
+
+    // Let messsage trigger update thread pool
+    String dbName = "TestDBSubMessageThreadPool";
+    _setupTool.addResourceToCluster(CLUSTER_NAME, dbName, 5, "OnlineOffline");
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, 1);
+
+    Thread.sleep(1000);
+
+    DefaultMessagingService svc = (DefaultMessagingService) (newParticipant.getMessagingService());
+    HelixTaskExecutor helixExecutor = svc.getExecutor();
+    Assert.assertEquals(
+        ((ThreadPoolExecutor) helixExecutor._batchMessageExecutorService).getMaximumPoolSize(),
+        customizedThreads);
+
+  }
 }