You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/06/11 00:35:19 UTC

[helix] branch task_pool updated (cadc17f -> d08a8f9)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a change to branch task_pool
in repository https://gitbox.apache.org/repos/asf/helix.git.


    from cadc17f  Fix ReadOnlyWagedRebalancer so that it computes mapping from scratch (#1058)
     new 1f92f31  Allow Configurable Thread Pool Size in TaskStateModelFactory (#973)
     new 35af263  Modify AssignableInstance for Configurable Thread Pool Size (#1009)
     new d08a8f9  Add CRUD endpoints to TaskDriver for configurable thread pool size support (#1011)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../helix/manager/zk/ParticipantManager.java       |   3 +
 .../apache/helix/manager/zk/ZKHelixManager.java    |   6 +-
 .../java/org/apache/helix/model/ClusterConfig.java |  41 ++++++-
 .../org/apache/helix/model/InstanceConfig.java     |  28 ++++-
 .../java/org/apache/helix/model/LiveInstance.java  |  23 +++-
 .../java/org/apache/helix/task/TaskConstants.java  |   7 ++
 .../java/org/apache/helix/task/TaskDriver.java     |  87 +++++++++++++++
 .../apache/helix/task/TaskStateModelFactory.java   |  71 ++++++++++--
 .../main/java/org/apache/helix/task/TaskUtil.java  |  54 +++++++++
 .../helix/task/assigner/AssignableInstance.java    |   4 +-
 .../main/java/org/apache/helix/util/HelixUtil.java |  10 ++
 .../multizk/TestMultiZkHelixJavaApis.java          |   4 +
 .../helix/integration/task/TestTaskThreadLeak.java |   5 +-
 .../helix/manager/zk/TestParticipantManager.java   |  44 ++++++++
 .../helix/manager/zk/TestZkClusterManager.java     |  10 +-
 .../org/apache/helix/model/TestClusterConfig.java  |  24 ++++
 .../org/apache/helix/model/TestInstanceConfig.java |  24 ++++
 .../TestLiveInstance.java}                         |  59 ++++++----
 .../java/org/apache/helix/task/TestTaskDriver.java | 108 ++++++++++++++++++
 .../helix/task/TestTaskStateModelFactory.java      | 124 +++++++++++++++++++++
 .../java/org/apache/helix/task/TestTaskUtil.java   | 118 ++++++++++++++++++++
 .../task/assigner/TestAssignableInstance.java      |  30 +++--
 22 files changed, 826 insertions(+), 58 deletions(-)
 rename helix-core/src/test/java/org/apache/helix/{manager/zk/TestZKLiveInstanceData.java => model/TestLiveInstance.java} (81%)
 create mode 100644 helix-core/src/test/java/org/apache/helix/task/TestTaskDriver.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java


[helix] 01/03: Allow Configurable Thread Pool Size in TaskStateModelFactory (#973)

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch task_pool
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 1f92f31d894414304dd1a0d3f16d028348ab8a0b
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Tue May 12 20:13:01 2020 -0700

    Allow Configurable Thread Pool Size in TaskStateModelFactory (#973)
    
    This PR changes TaskStateModelFactory logic such that it now obtains the thread pool size from InstanceConfig/ClusterConfig that may be specified by the users. This PR also added setters and getters for InstanceConfig/ClusterConfig for thread pool size fields. This PR also added thread pool size reporting during LiveInstance creation, with setters and getters for LiveInstance.
---
 .../helix/manager/zk/ParticipantManager.java       |   3 +
 .../apache/helix/manager/zk/ZKHelixManager.java    |   6 +-
 .../java/org/apache/helix/model/ClusterConfig.java |  41 ++++++-
 .../org/apache/helix/model/InstanceConfig.java     |  28 ++++-
 .../java/org/apache/helix/model/LiveInstance.java  |  23 +++-
 .../java/org/apache/helix/task/TaskConstants.java  |   7 ++
 .../apache/helix/task/TaskStateModelFactory.java   |  71 ++++++++++--
 .../main/java/org/apache/helix/task/TaskUtil.java  |  54 +++++++++
 .../helix/task/assigner/AssignableInstance.java    |   4 +-
 .../main/java/org/apache/helix/util/HelixUtil.java |  10 ++
 .../multizk/TestMultiZkHelixJavaApis.java          |   4 +
 .../helix/integration/task/TestTaskThreadLeak.java |   5 +-
 .../helix/manager/zk/TestParticipantManager.java   |  44 ++++++++
 .../helix/manager/zk/TestZkClusterManager.java     |  10 +-
 .../org/apache/helix/model/TestClusterConfig.java  |  24 ++++
 .../org/apache/helix/model/TestInstanceConfig.java |  24 ++++
 .../TestLiveInstance.java}                         |  59 ++++++----
 .../helix/task/TestTaskStateModelFactory.java      | 124 +++++++++++++++++++++
 .../java/org/apache/helix/task/TestTaskUtil.java   | 118 ++++++++++++++++++++
 .../task/assigner/TestAssignableInstance.java      |   8 +-
 20 files changed, 615 insertions(+), 52 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 704f247..1720342 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -52,6 +52,7 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -241,6 +242,8 @@ public class ParticipantManager {
     liveInstance.setSessionId(_sessionId);
     liveInstance.setHelixVersion(_manager.getVersion());
     liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+    liveInstance.setCurrentTaskThreadPoolSize(
+        TaskUtil.getTargetThreadPoolSize(_zkclient, _clusterName, _instanceName));
 
     // LiveInstanceInfoProvider liveInstanceInfoProvider = _manager._liveInstanceInfoProvider;
     if (_liveInstanceInfoProvider != null) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index e2429d7..9b1bfdd 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -1331,7 +1331,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
    * ZkConnections.
    */
   private RealmAwareZkClient createSingleRealmZkClient() {
-    final String shardingKey = buildShardingKey();
+    final String shardingKey = HelixUtil.clusterNameToShardingKey(_clusterName);
     PathBasedZkSerializer zkSerializer =
         ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
 
@@ -1390,8 +1390,4 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
     return zkClientFactory.buildZkClient(helixZkConnectionConfig, helixZkClientConfig);
   }
-
-  private String buildShardingKey() {
-    return _clusterName.charAt(0) == '/' ? _clusterName : "/" + _clusterName;
-  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 165919c..d149182 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -109,7 +109,14 @@ public class ClusterConfig extends HelixProperty {
     // https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
     //
     // Default to be true.
-    GLOBAL_REBALANCE_ASYNC_MODE
+    GLOBAL_REBALANCE_ASYNC_MODE,
+
+    // The target size of task thread pools for each participant. If participants specify their
+    // individual pool sizes in their InstanceConfig's, this value will NOT be used; if participants
+    // don't specify their individual pool sizes, this value will be used for all participants; if
+    // none of participants or the cluster define pool sizes,
+    // TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE will be used to create pool sizes.
+    GLOBAL_TARGET_TASK_THREAD_POOL_SIZE
   }
 
   public enum GlobalRebalancePreferenceKey {
@@ -137,6 +144,7 @@ public class ClusterConfig extends HelixProperty {
   private final static int MAX_REBALANCE_PREFERENCE = 10;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
+  private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
 
   /**
    * Instantiate for a specific cluster
@@ -710,6 +718,37 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Get the global target size of task thread pools. This values applies to all participants in
+   * the cluster; it's only used if participants don't specify their individual pool sizes in their
+   * InstanceConfig's. If none of participants or the cluster define pool sizes,
+   * TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE will be used to create pool sizes.
+   * @return the global target size of task thread pool
+   */
+  public int getGlobalTargetTaskThreadPoolSize() {
+    return _record
+        .getIntField(ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(),
+            GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+  }
+
+  /**
+   * Set the global target size of task thread pools for this cluster. This values applies to all
+   * participants in the cluster; it's only used if participants don't specify their individual
+   * pool sizes in their InstanceConfig's. If none of participants or the cluster define pool sizes,
+   * TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE will be used to create pool sizes.
+   * @param globalTargetTaskThreadPoolSize - the new global target task thread pool size
+   * @throws IllegalArgumentException - when the provided new thread pool size is negative
+   */
+  public void setGlobalTargetTaskThreadPoolSize(int globalTargetTaskThreadPoolSize)
+      throws IllegalArgumentException {
+    if (globalTargetTaskThreadPoolSize < 0) {
+      throw new IllegalArgumentException("globalTargetTaskThreadPoolSize must be non-negative!");
+    }
+    _record
+        .setIntField(ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(),
+            globalTargetTaskThreadPoolSize);
+  }
+
+  /**
    * @return The required Instance Capacity Keys. If not configured, return an empty list.
    */
   public List<String> getInstanceCapacityKeys() {
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 24e6154..010d943 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -56,11 +56,13 @@ public class InstanceConfig extends HelixProperty {
     DOMAIN,
     DELAY_REBALANCE_ENABLED,
     MAX_CONCURRENT_TASK,
-    INSTANCE_CAPACITY_MAP
+    INSTANCE_CAPACITY_MAP,
+    TARGET_TASK_THREAD_POOL_SIZE
   }
 
   public static final int WEIGHT_NOT_SET = -1;
   public static final int MAX_CONCURRENT_TASK_NOT_SET = -1;
+  private static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
 
   private static final Logger _logger = LoggerFactory.getLogger(InstanceConfig.class.getName());
 
@@ -507,6 +509,30 @@ public class InstanceConfig extends HelixProperty {
   }
 
   /**
+   * Get the target size of task thread pool.
+   * @return the target size of task thread pool
+   */
+  public int getTargetTaskThreadPoolSize() {
+    return _record
+        .getIntField(InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(),
+            TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+  }
+
+  /**
+   * Set the target size of task thread pool.
+   * @param targetTaskThreadPoolSize - the new target task thread pool size
+   * @throws IllegalArgumentException - when the provided new thread pool size is negative
+   */
+  public void setTargetTaskThreadPoolSize(int targetTaskThreadPoolSize)
+      throws IllegalArgumentException {
+    if (targetTaskThreadPoolSize < 0) {
+      throw new IllegalArgumentException("targetTaskThreadPoolSize must be non-negative!");
+    }
+    _record.setIntField(InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(),
+        targetTaskThreadPoolSize);
+  }
+
+  /**
    * Get the instance capacity information from the map fields.
    * @return data map if it exists, or empty map
    */
diff --git a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
index 74260cc..d9945c7 100644
--- a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
@@ -22,6 +22,7 @@ package org.apache.helix.model;
 import java.util.Map;
 
 import org.apache.helix.HelixProperty;
+import org.apache.helix.task.TaskConstants;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +39,8 @@ public class LiveInstance extends HelixProperty {
     HELIX_VERSION,
     LIVE_INSTANCE,
     ZKPROPERTYTRANSFERURL,
-    RESOURCE_CAPACITY
+    RESOURCE_CAPACITY,
+    CURRENT_TASK_THREAD_POOL_SIZE
   }
 
   /**
@@ -190,6 +192,25 @@ public class LiveInstance extends HelixProperty {
     _record.setSimpleField(LiveInstanceProperty.ZKPROPERTYTRANSFERURL.toString(), url);
   }
 
+  /**
+   * Get the current task thread pool size of the instance. For backward compatibility, return
+   * DEFAULT_TASK_THREAD_POOL_SIZE if it's not defined
+   * @return the current task thread pool size
+   */
+  public int getCurrentTaskThreadPoolSize() {
+    return _record.getIntField(LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
+        TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+  }
+
+  /**
+   * Set the current task thread pool size of the instance
+   * @param currentTaskThreadPoolSize the current task thread pool size
+   */
+  public void setCurrentTaskThreadPoolSize(int currentTaskThreadPoolSize) {
+    _record.setIntField(LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
+        currentTaskThreadPoolSize);
+  }
+
   @Override
   public boolean isValid() {
     if (getEphemeralOwner() == null) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index d17e29a..d9745b6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -49,4 +49,11 @@ public class TaskConstants {
   public static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
   public static final boolean DEFAULT_TASK_ENABLE_COMPRESSION = false;
+
+  /**
+   * The default task thread pool size that will be used to create thread pools if target thread
+   * pool sizes are not defined in InstanceConfig or ClusterConfig; also used as the current thread
+   * pool size default value if the current thread pool size is not defined in LiveInstance
+   */
+  public final static int DEFAULT_TASK_THREAD_POOL_SIZE = 40;
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index f30dd9f..90873d5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -19,6 +19,7 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -27,12 +28,22 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.JMException;
 
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.monitoring.mbeans.ThreadPoolExecutorMonitor;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Factory class for {@link TaskStateModel}.
  */
@@ -44,25 +55,31 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
   private final ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
 
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
-    this(manager, taskFactoryRegistry,
-        Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new ThreadFactory() {
-          private AtomicInteger threadId = new AtomicInteger(0);
-
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
-          }
-        }));
+    this(manager, taskFactoryRegistry, Executors.newScheduledThreadPool(TaskUtil
+        .getTargetThreadPoolSize(createZkClient(manager), manager.getClusterName(),
+            manager.getInstanceName()), new ThreadFactory() {
+      private AtomicInteger threadId = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
+      }
+    }));
   }
 
+  // DO NOT USE! This size of provided thread pool will not be reflected to controller
+  // properly, the controller may over schedule tasks to this participant. Task Framework needs to
+  // have full control of the thread pool unlike the state transition thread pool.
+  @Deprecated
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry,
       ScheduledExecutorService taskExecutor) {
     _manager = manager;
     _taskFactoryRegistry = taskFactoryRegistry;
     _taskExecutor = taskExecutor;
+    // TODO: Hunter: I'm not sure why this needs to be a single thread executor. We could certainly
+    // use more threads for timer tasks.
     _timerTaskExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
       @Override
       public Thread newThread(Runnable r) {
@@ -74,7 +91,7 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
         _monitor = new ThreadPoolExecutorMonitor(TaskConstants.STATE_MODEL_NAME,
             (ThreadPoolExecutor) _taskExecutor);
       } catch (JMException e) {
-        LOG.warn("Error in creating ThreadPoolExecutorMonitor for TaskStateModelFactory.");
+        LOG.warn("Error in creating ThreadPoolExecutorMonitor for TaskStateModelFactory.", e);
       }
     }
   }
@@ -102,4 +119,36 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Create a RealmAwareZkClient to get thread pool sizes
+   */
+  protected static RealmAwareZkClient createZkClient(HelixManager manager) {
+    // TODO: revisit the logic here - we are creating a connection although we already have a
+    // manager. We cannot use the connection within manager because some users connect the manager
+    // after registering the state model factory (in which case we cannot use manager's connection),
+    // and some connect the manager before registering the state model factory (in which case we
+    //can use manager's connection). We need to think about the right order and determine if we
+    //want to enforce it, which may cause backward incompatibility.
+    RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+        new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(new ZNRecordSerializer());
+
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+      String clusterName = manager.getClusterName();
+      String shardingKey = HelixUtil.clusterNameToShardingKey(clusterName);
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+          new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+              .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+              .setZkRealmShardingKey(shardingKey).build();
+      try {
+        return new FederatedZkClient(connectionConfig, clientConfig);
+      } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+        throw new HelixException("Failed to create FederatedZkClient!", e);
+      }
+    }
+
+    return SharedZkClientFactory.getInstance().buildZkClient(
+        new HelixZkClient.ZkConnectionConfig(manager.getMetadataStoreConnectionString()),
+        clientConfig.createHelixZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 917a69b..c67c14c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -30,11 +30,17 @@ import java.util.Set;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
 import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
@@ -1045,4 +1051,52 @@ public class TaskUtil {
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if InstanceConfig doesn't exist or the
+   * value is undefined, try ClusterConfig; if the value is undefined in ClusterConfig, fall back
+   * to the default value.
+   * @param zkClient - ZooKeeper connection for config reading
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(RealmAwareZkClient zkClient, String clusterName,
+      String instanceName) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+
+    // Check instance config first for thread pool size
+    if (ZKUtil.isInstanceSetup(zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) {
+      InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+        // Reject negative values. The pool size is only negative when it's not set in
+        // InstanceConfig, or when the users bypassed the setter logic in InstanceConfig. We treat
+        // negative values as the value is not set, and continue with ClusterConfig.
+        if (targetTaskThreadPoolSize >= 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    }
+
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    if (clusterConfig != null) {
+      int globalTargetTaskThreadPoolSize = clusterConfig.getGlobalTargetTaskThreadPoolSize();
+      // Reject negative values. The pool size is only negative when it's not set in
+      // ClusterConfig, or when the users bypassed the setter logic in ClusterConfig. We treat
+      // negative values as the value is not set, and continue with the default value.
+      if (globalTargetTaskThreadPoolSize >= 0) {
+        return globalTargetTaskThreadPoolSize;
+      }
+    } else {
+      LOG.warn("Got null as ClusterConfig for cluster {}. Returning default value: {}. ",
+          clusterName, TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+    }
+
+    return TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index c1f2ef6..194db41 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -28,7 +28,7 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,7 +113,7 @@ public class AssignableInstance {
     if (resourceCapacity == null) {
       resourceCapacity = new HashMap<>();
       resourceCapacity.put(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(),
-          Integer.toString(TaskStateModelFactory.TASK_THREADPOOL_SIZE));
+          Integer.toString(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE));
       logger.debug("No resource capacity provided in LiveInstance {}, assuming default capacity: {}",
           _instanceConfig.getInstanceName(), resourceCapacity);
     }
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index c1d5ad0..61f473d 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -109,6 +109,16 @@ public final class HelixUtil {
     return path.substring(path.lastIndexOf('/') + 1);
   }
 
+  /**
+   * Convert a cluster name to a sharding key for routing purpose by adding a "/" to the front.
+   * Check if the cluster name already has a "/" at the front; if so just return it.
+   * @param clusterName - cluster name
+   * @return the sharding key corresponding the cluster name
+   */
+  public static String clusterNameToShardingKey(String clusterName) {
+    return clusterName.charAt(0) == '/' ? clusterName : "/" + clusterName;
+  }
+
   public static String serializeByComma(List<String> objects) {
     return Joiner.on(",").join(objects);
   }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index d4bdcf9..742bb76 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -73,6 +73,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
 import org.apache.helix.zookeeper.zkclient.ZkServer;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -149,6 +150,9 @@ public class TestMultiZkHelixJavaApis {
     System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
         "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace);
 
+    // HttpRoutingDataReader's routing data may be set by other tests using the same endpoint;
+    // reset() for good measure
+    HttpRoutingDataReader.reset();
     // Create a FederatedZkClient for admin work
     _zkClient =
         new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
index 4e3e289..2f6b034 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
@@ -25,8 +25,8 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
 import org.testng.Assert;
@@ -73,7 +73,8 @@ public class TestTaskThreadLeak extends TaskTestBase {
     int threadCountAfter = getThreadCount("TaskStateModelFactory");
 
     Assert.assertTrue(
-        (threadCountAfter - _threadCountBefore) <= TaskStateModelFactory.TASK_THREADPOOL_SIZE + 1);
+        (threadCountAfter - _threadCountBefore) <= TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE
+            + 1);
   }
 
 
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
index a03f4f2..1ebe747 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
@@ -28,7 +28,10 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -145,6 +148,47 @@ public class TestParticipantManager extends ZkTestBase {
     deleteCluster(clusterName);
   }
 
+  @Test(dependsOnMethods = "testSessionExpiryCreateLiveInstance")
+  public void testCurrentTaskThreadPoolSizeCreation() throws Exception {
+    // Using a pool sized different from the default value to verify correctness
+    final int testThreadPoolSize = TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+    final String className = TestHelper.getTestClassName();
+    final String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    final ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
+        new ZkBaseDataAccessor.Builder<ZNRecord>().setZkAddress(ZK_ADDR).build());
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+
+    final String instanceName = "localhost_12918";
+    final MockParticipantManager manager =
+        new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+
+    InstanceConfig instanceConfig = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
+    instanceConfig.setTargetTaskThreadPoolSize(testThreadPoolSize);
+    accessor.setProperty(keyBuilder.instanceConfig(instanceName), instanceConfig);
+
+    manager.syncStart();
+
+    final LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+    Assert.assertNotNull(liveInstance);
+    Assert.assertEquals(liveInstance.getCurrentTaskThreadPoolSize(), testThreadPoolSize);
+
+    // Clean up.
+    manager.syncStop();
+    deleteCluster(clusterName);
+  }
+
   /*
    * Mocks PreConnectCallback to insert session expiry during ParticipantManager#handleNewSession()
    */
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index 9c5b64b..cfbc42d 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -167,7 +167,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
         accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_0"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 0);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 0);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 3);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 4);
 
     manager.disconnect();
 
@@ -180,7 +180,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_1"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 4);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
 
     manager.disconnect();
 
@@ -193,7 +193,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_2"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 6);
     Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
 
@@ -208,7 +208,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_3"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 6);
     Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
     String sessionId = liveInstance.getEphemeralOwner();
@@ -219,7 +219,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_3"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 6);
     Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
     Assert.assertFalse(sessionId.equals(liveInstance.getEphemeralOwner()));
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 8e4a016..ac2763b 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -70,6 +70,30 @@ public class TestClusterConfig {
   }
 
   @Test
+  public void testGetGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.getRecord().setIntField(
+        ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(), 100);
+
+    Assert.assertEquals(testConfig.getGlobalTargetTaskThreadPoolSize(), 100);
+  }
+
+  @Test
+  public void testSetGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalTargetTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testConfig.getRecord().getIntField(
+        ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(), -1), 100);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetGlobalTargetTaskThreadPoolSizeIllegalArgument() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalTargetTaskThreadPoolSize(-1);
+  }
+
+  @Test
   public void testGetRebalancePreference() {
     Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
     preference.put(EVENNESS, 5);
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
index 9b47677..d0b2f58 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
@@ -123,4 +123,28 @@ public class TestInstanceConfig {
     InstanceConfig testConfig = new InstanceConfig("testConfig");
     testConfig.setInstanceCapacityMap(capacityDataMap);
   }
+
+  @Test
+  public void testGetTargetTaskThreadPoolSize() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.getRecord().setIntField(
+        InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(), 100);
+
+    Assert.assertEquals(testConfig.getTargetTaskThreadPoolSize(), 100);
+  }
+
+  @Test
+  public void testSetTargetTaskThreadPoolSize() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setTargetTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testConfig.getRecord().getIntField(
+        InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(), -1), 100);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetTargetTaskThreadPoolSizeIllegalArgument() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setTargetTaskThreadPoolSize(-1);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java b/helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
similarity index 81%
rename from helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
rename to helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
index c3aace7..f53f9ec 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
@@ -1,4 +1,4 @@
-package org.apache.helix.manager.zk;
+package org.apache.helix.model;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -35,15 +35,27 @@ import org.apache.helix.LiveInstanceChangeListener;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConstants;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestZKLiveInstanceData extends ZkUnitTestBase {
+public class TestLiveInstance extends ZkUnitTestBase {
   private final String clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
 
+  @BeforeClass()
+  public void beforeClass() throws Exception {
+    _gSetupTool.addCluster(clusterName, true);
+    _gSetupTool
+        .addInstancesToCluster(clusterName, new String[] { "localhost:54321", "localhost:54322" });
+  }
+
+  @AfterClass()
+  public void afterClass() throws Exception {
+    deleteCluster(clusterName);
+  }
+
   @Test
   public void testDataChange() throws Exception {
     // Create an admin and add LiveInstanceChange listener to it
@@ -101,23 +113,6 @@ public class TestZKLiveInstanceData extends ZkUnitTestBase {
     Assert.assertTrue(instances.isEmpty(), "Expecting an empty list of live instance");
 
     adminManager.disconnect();
-
-  }
-
-  @BeforeClass()
-  public void beforeClass() throws Exception {
-    _gSetupTool.addCluster(clusterName, true);
-    _gSetupTool
-        .addInstancesToCluster(clusterName, new String[] { "localhost:54321", "localhost:54322" });
-  }
-
-  @AfterClass()
-  public void afterClass() throws Exception {
-    deleteCluster(clusterName);
-  }
-
-  private String[] getArgs(String... args) {
-    return args;
   }
 
   private List<LiveInstance> deepCopy(List<LiveInstance> instances) {
@@ -127,4 +122,28 @@ public class TestZKLiveInstanceData extends ZkUnitTestBase {
     }
     return result;
   }
+
+  @Test(dependsOnMethods = "testDataChange")
+  public void testGetCurrentTaskThreadPoolSize() {
+    LiveInstance testLiveInstance = new LiveInstance("testId");
+    testLiveInstance.getRecord()
+        .setIntField(LiveInstance.LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(), 100);
+
+    Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(), 100);
+  }
+
+  @Test(dependsOnMethods = "testGetCurrentTaskThreadPoolSize")
+  public void testGetCurrentTaskThreadPoolSizeDefault() {
+    LiveInstance testLiveInstance = new LiveInstance("testId");
+
+    Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(), TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetCurrentTaskThreadPoolSizeDefault")
+  public void testSetCurrentTaskThreadPoolSize() {
+    LiveInstance testLiveInstance = new LiveInstance("testId");
+    testLiveInstance.setCurrentTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(), 100);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
new file mode 100644
index 0000000..514256f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
@@ -0,0 +1,124 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskStateModelFactory extends TaskTestBase {
+  // This value has to be different from the default value to verify correctness
+  private static final int TEST_TARGET_TASK_THREAD_POOL_SIZE =
+      TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+  @Test
+  public void testConfigAccessorCreationMultiZk() throws Exception {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    instanceConfig.setTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    // Start a msds server
+    // TODO: Refactor all MSDS_SERVER_ENDPOINT creation in system property to one place.
+    // Any test that modifies MSDS_SERVER_ENDPOINT system property and accesses
+    // HttpRoutingDataReader (ex. TestMultiZkHelixJavaApis and this test) will cause the
+    // MSDS_SERVER_ENDPOINT system property to be recorded as final in HttpRoutingDataReader; that
+    // means any test class that satisfies the aforementioned condition and is executed first gets
+    // to "decide" the default msds endpoint. The only workaround is for all these test classes to
+    // use the same default msds endpoint.
+    final String msdsHostName = "localhost";
+    final int msdsPort = 11117;
+    final String msdsNamespace = "multiZkTest";
+    Map<String, Collection<String>> routingData = new HashMap<>();
+    routingData
+        .put(ZK_ADDR, Collections.singletonList("/" + anyParticipantManager.getClusterName()));
+    MockMetadataStoreDirectoryServer msds =
+        new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort, msdsNamespace, routingData);
+    msds.startServer();
+
+    // Save previously-set system configs
+    String prevMultiZkEnabled = System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    String prevMsdsServerEndpoint =
+        System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+    // Turn on multiZk mode in System config
+    System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
+    // MSDS endpoint: http://localhost:11117/admin/v2/namespaces/testTaskStateModelFactory
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
+        "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace);
+
+    HttpRoutingDataReader.reset();
+    RealmAwareZkClient zkClient = TaskStateModelFactory.createZkClient(anyParticipantManager);
+    Assert.assertEquals(TaskUtil
+        .getTargetThreadPoolSize(zkClient, anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName()), TEST_TARGET_TASK_THREAD_POOL_SIZE);
+
+    // Restore system properties
+    if (prevMultiZkEnabled == null) {
+      System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    } else {
+      System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, prevMultiZkEnabled);
+    }
+    if (prevMsdsServerEndpoint == null) {
+      System.clearProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY);
+    } else {
+      System.setProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY, prevMsdsServerEndpoint);
+    }
+    msds.stopServer();
+  }
+
+  @Test(dependsOnMethods = "testConfigAccessorCreationMultiZk")
+  public void testConfigAccessorCreationSingleZk() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    // Save previously-set system configs
+    String prevMultiZkEnabled = System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    // Turn off multiZk mode in System config
+    System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "false");
+
+    RealmAwareZkClient zkClient = TaskStateModelFactory.createZkClient(anyParticipantManager);
+    Assert.assertEquals(TaskUtil
+        .getTargetThreadPoolSize(zkClient, anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName()), TEST_TARGET_TASK_THREAD_POOL_SIZE);
+
+    // Restore system properties
+    if (prevMultiZkEnabled == null) {
+      System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    } else {
+      System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, prevMultiZkEnabled);
+    }
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
new file mode 100644
index 0000000..a6bfb69
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -0,0 +1,118 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskUtil extends TaskTestBase {
+  // This value has to be different from the default value to verify correctness
+  private static final int TEST_TARGET_TASK_THREAD_POOL_SIZE =
+      TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+  @Test
+  public void testGetTaskThreadPoolSize() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    instanceConfig.setTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    ClusterConfig clusterConfig = new ClusterConfig(anyParticipantManager.getClusterName());
+    clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE + 1);
+    anyParticipantManager.getConfigAccessor()
+        .setClusterConfig(anyParticipantManager.getClusterName(), clusterConfig);
+
+    Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), anyParticipantManager.getInstanceName()),
+        TEST_TARGET_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTaskThreadPoolSize")
+  public void testGetTaskThreadPoolSizeInstanceConfigUndefined() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    ClusterConfig clusterConfig = new ClusterConfig(anyParticipantManager.getClusterName());
+    clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setClusterConfig(anyParticipantManager.getClusterName(), clusterConfig);
+
+    Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), anyParticipantManager.getInstanceName()),
+        TEST_TARGET_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTaskThreadPoolSizeInstanceConfigUndefined")
+  public void testGetTaskThreadPoolSizeInstanceConfigDoesNotExist() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    HelixDataAccessor helixDataAccessor = anyParticipantManager.getHelixDataAccessor();
+    helixDataAccessor.removeProperty(
+        helixDataAccessor.keyBuilder().instanceConfig(anyParticipantManager.getInstanceName()));
+
+    ClusterConfig clusterConfig = new ClusterConfig(anyParticipantManager.getClusterName());
+    clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setClusterConfig(anyParticipantManager.getClusterName(), clusterConfig);
+
+    Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), anyParticipantManager.getInstanceName()),
+        TEST_TARGET_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTaskThreadPoolSizeInstanceConfigDoesNotExist")
+  public void testGetTaskThreadPoolSizeClusterConfigUndefined() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    ClusterConfig clusterConfig = new ClusterConfig(anyParticipantManager.getClusterName());
+    anyParticipantManager.getConfigAccessor()
+        .setClusterConfig(anyParticipantManager.getClusterName(), clusterConfig);
+
+    Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), anyParticipantManager.getInstanceName()),
+        TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTaskThreadPoolSizeClusterConfigUndefined", expectedExceptions = HelixException.class)
+  public void testGetTaskThreadPoolSizeClusterConfigDoesNotExist() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    HelixDataAccessor helixDataAccessor = anyParticipantManager.getHelixDataAccessor();
+    helixDataAccessor.removeProperty(helixDataAccessor.keyBuilder().clusterConfig());
+    TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), anyParticipantManager.getInstanceName());
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 0e8b1d0..1dac153 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -26,7 +26,7 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskConstants;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
@@ -66,7 +66,7 @@ public class TestAssignableInstance extends AssignerTestBase {
     Assert.assertEquals(
         (int) ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
             .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
-        TaskStateModelFactory.TASK_THREADPOOL_SIZE);
+        TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
     Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
   }
 
@@ -106,7 +106,7 @@ public class TestAssignableInstance extends AssignerTestBase {
         testQuotaTypes.length);
     Assert.assertEquals(
         ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()),
-        calculateExpectedQuotaPerType(TaskStateModelFactory.TASK_THREADPOOL_SIZE, testQuotaTypes,
+        calculateExpectedQuotaPerType(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE, testQuotaTypes,
             testQuotaRatio));
     Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
   }
@@ -176,7 +176,7 @@ public class TestAssignableInstance extends AssignerTestBase {
 
     // When nothing is configured, we should use default quota type to assign
     Map<String, TaskAssignResult> results = new HashMap<>();
-    for (int i = 0; i < TaskStateModelFactory.TASK_THREADPOOL_SIZE; i++) {
+    for (int i = 0; i < TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE; i++) {
       String taskId = Integer.toString(i);
       TaskConfig task = new TaskConfig("", null, taskId, null);
       TaskAssignResult result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);


[helix] 03/03: Add CRUD endpoints to TaskDriver for configurable thread pool size support (#1011)

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch task_pool
in repository https://gitbox.apache.org/repos/asf/helix.git

commit d08a8f9a0db2d0de9d4184f7e5f786cd7d33a8d5
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Mon May 18 11:33:58 2020 -0700

    Add CRUD endpoints to TaskDriver for configurable thread pool size support (#1011)
    
    We are adding CRUD endpoints to TaskDriver that support "setting target thread pool sizes", "getting target thread pool sizes", "setting global target thread pool sizes", "getting global target thread pool sizes", and "getting current thread pool sizes".
---
 .../java/org/apache/helix/task/TaskDriver.java     |  87 +++++++++++++++++
 .../java/org/apache/helix/task/TestTaskDriver.java | 108 +++++++++++++++++++++
 2 files changed, 195 insertions(+)

diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 73e23e2..ecce9c2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -41,7 +41,10 @@ import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.store.HelixPropertyStore;
@@ -1152,4 +1155,88 @@ public class TaskDriver {
           "Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS.");
     }
   }
+
+  /**
+   * Get the target task thread pool size of an instance, a value that's used to construct the task
+   * thread pool and is created by users.
+   * @param instanceName - name of the instance
+   * @return the target task thread pool size of the instance
+   */
+  public int getTargetTaskThreadPoolSize(String instanceName) {
+    InstanceConfig instanceConfig = getInstanceConfig(instanceName);
+    return instanceConfig.getTargetTaskThreadPoolSize();
+  }
+
+  /**
+   * Set the target task thread pool size of an instance. The target task thread pool size goes to
+   * InstanceConfig, and is used to construct the task thread pool. The newly-set target task
+   * thread pool size will take effect upon a JVM restart.
+   * @param instanceName - name of the instance
+   * @param targetTaskThreadPoolSize - the target task thread pool size of the instance
+   */
+  public void setTargetTaskThreadPoolSize(String instanceName, int targetTaskThreadPoolSize) {
+    InstanceConfig instanceConfig = getInstanceConfig(instanceName);
+    instanceConfig.setTargetTaskThreadPoolSize(targetTaskThreadPoolSize);
+  }
+
+  private InstanceConfig getInstanceConfig(String instanceName) {
+    InstanceConfig instanceConfig =
+        _accessor.getProperty(_accessor.keyBuilder().instanceConfig(instanceName));
+    if (instanceConfig == null) {
+      throw new IllegalArgumentException(
+          "Failed to find InstanceConfig with provided instance name " + instanceName + "!");
+    }
+    return instanceConfig;
+  }
+
+  /**
+   * Get the global target task thread pool size of the cluster, a value that's used to construct
+   * task thread pools for the cluster's instances and is created by users.
+   * @return the global target task thread pool size of the cluster
+   */
+  public int getGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig clusterConfig = getClusterConfig();
+    return clusterConfig.getGlobalTargetTaskThreadPoolSize();
+  }
+
+  /**
+   * Set the global target task thread pool size of the cluster. The global target task thread pool
+   * size goes to ClusterConfig, and is applied to all instances of the cluster. If an instance
+   * doesn't specify its target thread pool size in InstanceConfig, then this value in ClusterConfig
+   * will be used to construct its task thread pool. The newly-set target task thread pool size will
+   * take effect upon a JVM restart. If none of the global and per-instance target thread pool sizes
+   * are set, a default size will be used.
+   * @param globalTargetTaskThreadPoolSize - the global target task thread pool size of the cluster
+   */
+  public void setGlobalTargetTaskThreadPoolSize(int globalTargetTaskThreadPoolSize) {
+    ClusterConfig clusterConfig = getClusterConfig();
+    clusterConfig.setGlobalTargetTaskThreadPoolSize(globalTargetTaskThreadPoolSize);
+  }
+
+  private ClusterConfig getClusterConfig() {
+    ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig());
+    if (clusterConfig == null) {
+      throw new IllegalStateException(
+          "Failed to find ClusterConfig for cluster " + _clusterName + "!");
+    }
+    return clusterConfig;
+  }
+
+  /**
+   * Get the current target task thread pool size of an instance. This value reflects the current
+   * task thread pool size that's already created on the instance, and may be different from the
+   * target thread pool size.
+   * @param instanceName - name of the instance
+   * @return the current task thread pool size of the instance
+   */
+  public int getCurrentTaskThreadPoolSize(String instanceName) {
+    LiveInstance liveInstance =
+        _accessor.getProperty(_accessor.keyBuilder().liveInstance(instanceName));
+    if (liveInstance == null) {
+      throw new IllegalArgumentException(
+          "Failed to find LiveInstance with provided instance name " + instanceName + "!");
+    }
+
+    return liveInstance.getCurrentTaskThreadPoolSize();
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskDriver.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskDriver.java
new file mode 100644
index 0000000..dd0ef3d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskDriver.java
@@ -0,0 +1,108 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestTaskDriver extends TaskTestBase {
+  // Use a thread pool size that's different from the default value for test
+  private static final int TEST_THREAD_POOL_SIZE = TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+  private static final String NON_EXISTENT_INSTANCE_NAME = "NON_EXISTENT_INSTANCE_NAME";
+
+  private TaskDriver _taskDriver;
+  private ConfigAccessor _configAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+
+    _taskDriver = new TaskDriver(_controller);
+    _configAccessor = _controller.getConfigAccessor();
+  }
+
+  @Test
+  public void testGetTargetTaskThreadPoolSize() {
+    String validInstanceName = _participants[0].getInstanceName();
+    InstanceConfig instanceConfig =
+        _configAccessor.getInstanceConfig(CLUSTER_NAME, validInstanceName);
+    instanceConfig.setTargetTaskThreadPoolSize(TEST_THREAD_POOL_SIZE);
+    _configAccessor.setInstanceConfig(CLUSTER_NAME, validInstanceName, instanceConfig);
+
+    Assert.assertEquals(_taskDriver.getTargetTaskThreadPoolSize(validInstanceName),
+        TEST_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTargetTaskThreadPoolSize", expectedExceptions = IllegalArgumentException.class)
+  public void testGetTargetTaskThreadPoolSizeWrongInstanceName() {
+    _taskDriver.getTargetTaskThreadPoolSize(NON_EXISTENT_INSTANCE_NAME);
+  }
+
+  @Test(dependsOnMethods = "testGetTargetTaskThreadPoolSizeWrongInstanceName")
+  public void testSetTargetTaskThreadPoolSize() {
+    String validInstanceName = _participants[0].getInstanceName();
+    _taskDriver.setTargetTaskThreadPoolSize(validInstanceName, TEST_THREAD_POOL_SIZE);
+    InstanceConfig instanceConfig =
+        _configAccessor.getInstanceConfig(CLUSTER_NAME, validInstanceName);
+
+    Assert.assertEquals(instanceConfig.getTargetTaskThreadPoolSize(), TEST_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testSetTargetTaskThreadPoolSize", expectedExceptions = IllegalArgumentException.class)
+  public void testSetTargetTaskThreadPoolSizeWrongInstanceName() {
+    _taskDriver.setTargetTaskThreadPoolSize(NON_EXISTENT_INSTANCE_NAME, TEST_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testSetTargetTaskThreadPoolSizeWrongInstanceName")
+  public void testGetGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_THREAD_POOL_SIZE);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    Assert.assertEquals(_taskDriver.getGlobalTargetTaskThreadPoolSize(), TEST_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetGlobalTargetTaskThreadPoolSize")
+  public void testSetGlobalTargetTaskThreadPoolSize() {
+    _taskDriver.setGlobalTargetTaskThreadPoolSize(TEST_THREAD_POOL_SIZE);
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+
+    Assert.assertEquals(clusterConfig.getGlobalTargetTaskThreadPoolSize(), TEST_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testSetGlobalTargetTaskThreadPoolSize")
+  public void testGetCurrentTaskThreadPoolSize() {
+    String validInstanceName = _participants[0].getInstanceName();
+
+    Assert.assertEquals(_taskDriver.getCurrentTaskThreadPoolSize(validInstanceName),
+        TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetCurrentTaskThreadPoolSize", expectedExceptions = IllegalArgumentException.class)
+  public void testGetCurrentTaskThreadPoolSizeWrongInstanceName() {
+    _taskDriver.getCurrentTaskThreadPoolSize(NON_EXISTENT_INSTANCE_NAME);
+  }
+}


[helix] 02/03: Modify AssignableInstance for Configurable Thread Pool Size (#1009)

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch task_pool
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 35af2639482ecb61adf61e618c8d0b788ac1b790
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Fri May 15 14:26:56 2020 -0700

    Modify AssignableInstance for Configurable Thread Pool Size (#1009)
    
    AssignableInstance used to assign a default value of 40 to its resource capacity (resource capacity has only one field: TASK_EXEC_THREAD). With the recent change related to configurable thread pool size, AssignableInstance should respect the reported thread pool size in LiveInstance's.
---
 .../helix/task/assigner/AssignableInstance.java    |  2 +-
 .../task/assigner/TestAssignableInstance.java      | 28 +++++++++++++++-------
 2 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index 194db41..67199a3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -113,7 +113,7 @@ public class AssignableInstance {
     if (resourceCapacity == null) {
       resourceCapacity = new HashMap<>();
       resourceCapacity.put(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(),
-          Integer.toString(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE));
+          Integer.toString(_liveInstance.getCurrentTaskThreadPoolSize()));
       logger.debug("No resource capacity provided in LiveInstance {}, assuming default capacity: {}",
           _instanceConfig.getInstanceName(), resourceCapacity);
     }
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 1dac153..fbbf06e 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -55,18 +55,20 @@ public class TestAssignableInstance extends AssignerTestBase {
 
   @Test
   public void testInitializationWithQuotaUnset() {
+    int expectedCurrentTaskThreadPoolSize = 100;
+    LiveInstance liveInstance = createLiveInstance(null, null);
+    liveInstance.setCurrentTaskThreadPoolSize(expectedCurrentTaskThreadPoolSize);
+
     // Initialize AssignableInstance with neither resource capacity nor quota ratio provided
     AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, false),
-        new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+        new InstanceConfig(testInstanceName), liveInstance);
     Assert.assertEquals(ai.getUsedCapacity().size(), 1);
     Assert.assertEquals(
         (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
-            .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
-        0);
+            .get(AssignableInstance.DEFAULT_QUOTA_TYPE), 0);
     Assert.assertEquals(
         (int) ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
-            .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
-        TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+            .get(AssignableInstance.DEFAULT_QUOTA_TYPE), expectedCurrentTaskThreadPoolSize);
     Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
   }
 
@@ -91,10 +93,14 @@ public class TestAssignableInstance extends AssignerTestBase {
 
   @Test
   public void testInitializationWithOnlyQuotaType() {
+    int expectedCurrentTaskThreadPoolSize = 100;
+    LiveInstance liveInstance = createLiveInstance(null, null);
+    liveInstance.setCurrentTaskThreadPoolSize(expectedCurrentTaskThreadPoolSize);
+
     // Initialize AssignableInstance with only quota type provided
     AssignableInstance ai =
         new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
-            new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+            new InstanceConfig(testInstanceName), liveInstance);
 
     Assert.assertEquals(ai.getTotalCapacity().size(), 1);
     Assert.assertEquals(ai.getUsedCapacity().size(), 1);
@@ -106,7 +112,7 @@ public class TestAssignableInstance extends AssignerTestBase {
         testQuotaTypes.length);
     Assert.assertEquals(
         ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()),
-        calculateExpectedQuotaPerType(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE, testQuotaTypes,
+        calculateExpectedQuotaPerType(expectedCurrentTaskThreadPoolSize, testQuotaTypes,
             testQuotaRatio));
     Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
   }
@@ -171,12 +177,16 @@ public class TestAssignableInstance extends AssignerTestBase {
 
   @Test
   public void testNormalTryAssign() {
+    int testCurrentTaskThreadPoolSize = 100;
+    LiveInstance liveInstance = createLiveInstance(null, null);
+    liveInstance.setCurrentTaskThreadPoolSize(testCurrentTaskThreadPoolSize);
+
     AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, true),
-        new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+        new InstanceConfig(testInstanceName), liveInstance);
 
     // When nothing is configured, we should use default quota type to assign
     Map<String, TaskAssignResult> results = new HashMap<>();
-    for (int i = 0; i < TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE; i++) {
+    for (int i = 0; i < testCurrentTaskThreadPoolSize; i++) {
       String taskId = Integer.toString(i);
       TaskConfig task = new TaskConfig("", null, taskId, null);
       TaskAssignResult result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);