You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/08/28 20:25:08 UTC

[helix] branch master updated: Configurable Task Thread Pool (#1330)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f5151a  Configurable Task Thread Pool (#1330)
8f5151a is described below

commit 8f5151a466808b2e8ce00c11301ad964dd2d782b
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Fri Aug 28 13:24:56 2020 -0700

    Configurable Task Thread Pool (#1330)
    
    This PR adds the Configurable Task Thread Pool feature to task framework. Users can specify target task pool sizes on either the cluster-level or on the instance-level by setting either GLOBAL_TARGET_TASK_THREAD_POOL_SIZE in ClusterConfig or TARGET_TASK_THREAD_POOL_SIZE in InstanceConfig. After JVM restart, an instance will have a pool with the specified size for task threads.
    
    -----
    
    
    * Allow Configurable Thread Pool Size in TaskStateModelFactory (#973)
    
    This PR changes TaskStateModelFactory logic such that it now obtains the thread pool size from InstanceConfig/ClusterConfig that may be specified by the users. This PR also added setters and getters for InstanceConfig/ClusterConfig for thread pool size fields. This PR also added thread pool size reporting during LiveInstance creation, with setters and getters for LiveInstance.
    
    * Modify AssignableInstance for Configurable Thread Pool Size (#1009)
    
    AssignableInstance used to assign a default value of 40 to its resource capacity (resource capacity has only one field: TASK_EXEC_THREAD). With the recent change related to configurable thread pool size, AssignableInstance should respect the reported thread pool size in LiveInstance's.
    
    * Add CRUD endpoints to TaskDriver for configurable thread pool size support (#1011)
    
    We are adding CRUD endpoints to TaskDriver that support "setting target thread pool sizes", "getting target thread pool sizes", "setting global target thread pool sizes", "getting global target thread pool sizes", and "getting current thread pool sizes".
    
    * Configurable Task Pool TaskDriver helpers fix (#1327)
    
    The setters were not actually setting the InstanceConfig/ClusterConfig. Fixed by setting them.
    
    Co-authored-by: Neal Sun <ne...@nesun-mn1.linkedin.biz>
---
 .../helix/manager/zk/ParticipantManager.java       |   3 +
 .../apache/helix/manager/zk/ZKHelixManager.java    |   6 +-
 .../java/org/apache/helix/model/ClusterConfig.java |  41 ++++++-
 .../org/apache/helix/model/InstanceConfig.java     |  28 ++++-
 .../java/org/apache/helix/model/LiveInstance.java  |  23 +++-
 .../java/org/apache/helix/task/TaskConstants.java  |   7 ++
 .../java/org/apache/helix/task/TaskDriver.java     |  89 +++++++++++++++
 .../apache/helix/task/TaskStateModelFactory.java   |  71 ++++++++++--
 .../main/java/org/apache/helix/task/TaskUtil.java  |  54 +++++++++
 .../helix/task/assigner/AssignableInstance.java    |   4 +-
 .../main/java/org/apache/helix/util/HelixUtil.java |  10 ++
 .../multizk/TestMultiZkHelixJavaApis.java          |   2 +
 .../helix/integration/task/TestTaskThreadLeak.java |   5 +-
 .../helix/manager/zk/TestParticipantManager.java   |  44 ++++++++
 .../helix/manager/zk/TestZkClusterManager.java     |  10 +-
 .../org/apache/helix/model/TestClusterConfig.java  |  24 ++++
 .../org/apache/helix/model/TestInstanceConfig.java |  24 ++++
 .../TestLiveInstance.java}                         |  59 ++++++----
 .../java/org/apache/helix/task/TestTaskDriver.java | 100 +++++++++++++++++
 .../helix/task/TestTaskStateModelFactory.java      | 124 +++++++++++++++++++++
 .../java/org/apache/helix/task/TestTaskUtil.java   |  90 +++++++++++++++
 .../task/assigner/TestAssignableInstance.java      |  30 +++--
 22 files changed, 790 insertions(+), 58 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 33e95a1..e91182a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -53,6 +53,7 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
 import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -248,6 +249,8 @@ public class ParticipantManager {
     liveInstance.setSessionId(_sessionId);
     liveInstance.setHelixVersion(_manager.getVersion());
     liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+    liveInstance.setCurrentTaskThreadPoolSize(
+        TaskUtil.getTargetThreadPoolSize(_zkclient, _clusterName, _instanceName));
 
     // LiveInstanceInfoProvider liveInstanceInfoProvider = _manager._liveInstanceInfoProvider;
     if (_liveInstanceInfoProvider != null) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index f8967bc..e943985 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -1404,7 +1404,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
    * ZkConnections.
    */
   private RealmAwareZkClient createSingleRealmZkClient() {
-    final String shardingKey = buildShardingKey();
+    final String shardingKey = HelixUtil.clusterNameToShardingKey(_clusterName);
     PathBasedZkSerializer zkSerializer =
         ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
 
@@ -1468,10 +1468,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     return zkClientFactory.buildZkClient(helixZkConnectionConfig, helixZkClientConfig);
   }
 
-  private String buildShardingKey() {
-    return _clusterName.charAt(0) == '/' ? _clusterName : "/" + _clusterName;
-  }
-
   /**
    * Check that not both zkAddress and ZkConnectionConfig are set.
    * If zkAddress is not given and ZkConnectionConfig is given, check that ZkConnectionConfig has
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index f6a6481..bfcf137 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -115,7 +115,14 @@ public class ClusterConfig extends HelixProperty {
      * Configure the abnormal partition states resolver classes for the corresponding state model.
      * <State Model Def Name, Full Path of the Resolver Class Name>
      */
-    ABNORMAL_STATES_RESOLVER_MAP
+    ABNORMAL_STATES_RESOLVER_MAP,
+
+    // The target size of task thread pools for each participant. If participants specify their
+    // individual pool sizes in their InstanceConfig's, this value will NOT be used; if participants
+    // don't specify their individual pool sizes, this value will be used for all participants; if
+    // none of participants or the cluster define pool sizes,
+    // TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE will be used to create pool sizes.
+    GLOBAL_TARGET_TASK_THREAD_POOL_SIZE
   }
 
   public enum GlobalRebalancePreferenceKey {
@@ -143,6 +150,7 @@ public class ClusterConfig extends HelixProperty {
   private final static int MAX_REBALANCE_PREFERENCE = 10;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
+  private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
 
   /**
    * Instantiate for a specific cluster
@@ -717,6 +725,37 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Get the global target size of task thread pools. This values applies to all participants in
+   * the cluster; it's only used if participants don't specify their individual pool sizes in their
+   * InstanceConfig's. If none of participants or the cluster define pool sizes,
+   * TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE will be used to create pool sizes.
+   * @return the global target size of task thread pool
+   */
+  public int getGlobalTargetTaskThreadPoolSize() {
+    return _record
+        .getIntField(ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(),
+            GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+  }
+
+  /**
+   * Set the global target size of task thread pools for this cluster. This values applies to all
+   * participants in the cluster; it's only used if participants don't specify their individual
+   * pool sizes in their InstanceConfig's. If none of participants or the cluster define pool sizes,
+   * TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE will be used to create pool sizes.
+   * @param globalTargetTaskThreadPoolSize - the new global target task thread pool size
+   * @throws IllegalArgumentException - when the provided new thread pool size is negative
+   */
+  public void setGlobalTargetTaskThreadPoolSize(int globalTargetTaskThreadPoolSize)
+      throws IllegalArgumentException {
+    if (globalTargetTaskThreadPoolSize < 0) {
+      throw new IllegalArgumentException("globalTargetTaskThreadPoolSize must be non-negative!");
+    }
+    _record
+        .setIntField(ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(),
+            globalTargetTaskThreadPoolSize);
+  }
+
+  /**
    * @return The required Instance Capacity Keys. If not configured, return an empty list.
    */
   public List<String> getInstanceCapacityKeys() {
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index cbcab29..143b610 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -56,11 +56,13 @@ public class InstanceConfig extends HelixProperty {
     DOMAIN,
     DELAY_REBALANCE_ENABLED,
     MAX_CONCURRENT_TASK,
-    INSTANCE_CAPACITY_MAP
+    INSTANCE_CAPACITY_MAP,
+    TARGET_TASK_THREAD_POOL_SIZE
   }
 
   public static final int WEIGHT_NOT_SET = -1;
   public static final int MAX_CONCURRENT_TASK_NOT_SET = -1;
+  private static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
 
   private static final Logger _logger = LoggerFactory.getLogger(InstanceConfig.class.getName());
 
@@ -517,6 +519,30 @@ public class InstanceConfig extends HelixProperty {
   }
 
   /**
+   * Get the target size of task thread pool.
+   * @return the target size of task thread pool
+   */
+  public int getTargetTaskThreadPoolSize() {
+    return _record
+        .getIntField(InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(),
+            TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+  }
+
+  /**
+   * Set the target size of task thread pool.
+   * @param targetTaskThreadPoolSize - the new target task thread pool size
+   * @throws IllegalArgumentException - when the provided new thread pool size is negative
+   */
+  public void setTargetTaskThreadPoolSize(int targetTaskThreadPoolSize)
+      throws IllegalArgumentException {
+    if (targetTaskThreadPoolSize < 0) {
+      throw new IllegalArgumentException("targetTaskThreadPoolSize must be non-negative!");
+    }
+    _record.setIntField(InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(),
+        targetTaskThreadPoolSize);
+  }
+
+  /**
    * Get the instance capacity information from the map fields.
    * @return data map if it exists, or empty map
    */
diff --git a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
index 8037fc9..0adccdd 100644
--- a/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
@@ -22,6 +22,7 @@ package org.apache.helix.model;
 import java.util.Map;
 
 import org.apache.helix.HelixProperty;
+import org.apache.helix.task.TaskConstants;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +39,8 @@ public class LiveInstance extends HelixProperty {
     HELIX_VERSION,
     LIVE_INSTANCE,
     ZKPROPERTYTRANSFERURL,
-    RESOURCE_CAPACITY
+    RESOURCE_CAPACITY,
+    CURRENT_TASK_THREAD_POOL_SIZE
   }
 
   /**
@@ -190,6 +192,25 @@ public class LiveInstance extends HelixProperty {
     _record.setSimpleField(LiveInstanceProperty.ZKPROPERTYTRANSFERURL.toString(), url);
   }
 
+  /**
+   * Get the current task thread pool size of the instance. For backward compatibility, return
+   * DEFAULT_TASK_THREAD_POOL_SIZE if it's not defined
+   * @return the current task thread pool size
+   */
+  public int getCurrentTaskThreadPoolSize() {
+    return _record.getIntField(LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
+        TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+  }
+
+  /**
+   * Set the current task thread pool size of the instance
+   * @param currentTaskThreadPoolSize the current task thread pool size
+   */
+  public void setCurrentTaskThreadPoolSize(int currentTaskThreadPoolSize) {
+    _record.setIntField(LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
+        currentTaskThreadPoolSize);
+  }
+
   @Override
   public boolean isValid() {
     if (getEphemeralOwner() == null) {
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index 4ee7941..04d8298 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -49,4 +49,11 @@ public class TaskConstants {
   public static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
   public static final boolean DEFAULT_TASK_ENABLE_COMPRESSION = false;
+
+  /**
+   * The default task thread pool size that will be used to create thread pools if target thread
+   * pool sizes are not defined in InstanceConfig or ClusterConfig; also used as the current thread
+   * pool size default value if the current thread pool size is not defined in LiveInstance
+   */
+  public final static int DEFAULT_TASK_THREAD_POOL_SIZE = 40;
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 506eca9..fb11721 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -41,7 +41,10 @@ import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.store.HelixPropertyStore;
@@ -1152,4 +1155,90 @@ public class TaskDriver {
           "Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS.");
     }
   }
+
+  /**
+   * Get the target task thread pool size of an instance, a value that's used to construct the task
+   * thread pool and is created by users.
+   * @param instanceName - name of the instance
+   * @return the target task thread pool size of the instance
+   */
+  public int getTargetTaskThreadPoolSize(String instanceName) {
+    InstanceConfig instanceConfig = getInstanceConfig(instanceName);
+    return instanceConfig.getTargetTaskThreadPoolSize();
+  }
+
+  /**
+   * Set the target task thread pool size of an instance. The target task thread pool size goes to
+   * InstanceConfig, and is used to construct the task thread pool. The newly-set target task
+   * thread pool size will take effect upon a JVM restart.
+   * @param instanceName - name of the instance
+   * @param targetTaskThreadPoolSize - the target task thread pool size of the instance
+   */
+  public void setTargetTaskThreadPoolSize(String instanceName, int targetTaskThreadPoolSize) {
+    InstanceConfig instanceConfig = getInstanceConfig(instanceName);
+    instanceConfig.setTargetTaskThreadPoolSize(targetTaskThreadPoolSize);
+    _accessor.setProperty(_accessor.keyBuilder().instanceConfig(instanceName), instanceConfig);
+  }
+
+  private InstanceConfig getInstanceConfig(String instanceName) {
+    InstanceConfig instanceConfig =
+        _accessor.getProperty(_accessor.keyBuilder().instanceConfig(instanceName));
+    if (instanceConfig == null) {
+      throw new IllegalArgumentException(
+          "Failed to find InstanceConfig with provided instance name " + instanceName + "!");
+    }
+    return instanceConfig;
+  }
+
+  /**
+   * Get the global target task thread pool size of the cluster, a value that's used to construct
+   * task thread pools for the cluster's instances and is created by users.
+   * @return the global target task thread pool size of the cluster
+   */
+  public int getGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig clusterConfig = getClusterConfig();
+    return clusterConfig.getGlobalTargetTaskThreadPoolSize();
+  }
+
+  /**
+   * Set the global target task thread pool size of the cluster. The global target task thread pool
+   * size goes to ClusterConfig, and is applied to all instances of the cluster. If an instance
+   * doesn't specify its target thread pool size in InstanceConfig, then this value in ClusterConfig
+   * will be used to construct its task thread pool. The newly-set target task thread pool size will
+   * take effect upon a JVM restart. If none of the global and per-instance target thread pool sizes
+   * are set, a default size will be used.
+   * @param globalTargetTaskThreadPoolSize - the global target task thread pool size of the cluster
+   */
+  public void setGlobalTargetTaskThreadPoolSize(int globalTargetTaskThreadPoolSize) {
+    ClusterConfig clusterConfig = getClusterConfig();
+    clusterConfig.setGlobalTargetTaskThreadPoolSize(globalTargetTaskThreadPoolSize);
+    _accessor.setProperty(_accessor.keyBuilder().clusterConfig(), clusterConfig);
+  }
+
+  private ClusterConfig getClusterConfig() {
+    ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig());
+    if (clusterConfig == null) {
+      throw new IllegalStateException(
+          "Failed to find ClusterConfig for cluster " + _clusterName + "!");
+    }
+    return clusterConfig;
+  }
+
+  /**
+   * Get the current target task thread pool size of an instance. This value reflects the current
+   * task thread pool size that's already created on the instance, and may be different from the
+   * target thread pool size.
+   * @param instanceName - name of the instance
+   * @return the current task thread pool size of the instance
+   */
+  public int getCurrentTaskThreadPoolSize(String instanceName) {
+    LiveInstance liveInstance =
+        _accessor.getProperty(_accessor.keyBuilder().liveInstance(instanceName));
+    if (liveInstance == null) {
+      throw new IllegalArgumentException(
+          "Failed to find LiveInstance with provided instance name " + instanceName + "!");
+    }
+
+    return liveInstance.getCurrentTaskThreadPoolSize();
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index 5818bbd..215c895 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -19,6 +19,7 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -28,12 +29,22 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.JMException;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.monitoring.mbeans.ThreadPoolExecutorMonitor;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Factory class for {@link TaskStateModel}.
  */
@@ -45,25 +56,31 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
   private final ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
 
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
-    this(manager, taskFactoryRegistry,
-        Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new ThreadFactory() {
-          private AtomicInteger threadId = new AtomicInteger(0);
-
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
-          }
-        }));
+    this(manager, taskFactoryRegistry, Executors.newScheduledThreadPool(TaskUtil
+        .getTargetThreadPoolSize(createZkClient(manager), manager.getClusterName(),
+            manager.getInstanceName()), new ThreadFactory() {
+      private AtomicInteger threadId = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
+      }
+    }));
   }
 
+  // DO NOT USE! This size of provided thread pool will not be reflected to controller
+  // properly, the controller may over schedule tasks to this participant. Task Framework needs to
+  // have full control of the thread pool unlike the state transition thread pool.
+  @Deprecated
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry,
       ScheduledExecutorService taskExecutor) {
     _manager = manager;
     _taskFactoryRegistry = taskFactoryRegistry;
     _taskExecutor = taskExecutor;
+    // TODO: Hunter: I'm not sure why this needs to be a single thread executor. We could certainly
+    // use more threads for timer tasks.
     _timerTaskExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
       @Override
       public Thread newThread(Runnable r) {
@@ -75,7 +92,7 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
         _monitor = new ThreadPoolExecutorMonitor(TaskConstants.STATE_MODEL_NAME,
             (ThreadPoolExecutor) _taskExecutor);
       } catch (JMException e) {
-        LOG.warn("Error in creating ThreadPoolExecutorMonitor for TaskStateModelFactory.");
+        LOG.warn("Error in creating ThreadPoolExecutorMonitor for TaskStateModelFactory.", e);
       }
     }
   }
@@ -112,4 +129,36 @@ public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Create a RealmAwareZkClient to get thread pool sizes
+   */
+  protected static RealmAwareZkClient createZkClient(HelixManager manager) {
+    // TODO: revisit the logic here - we are creating a connection although we already have a
+    // manager. We cannot use the connection within manager because some users connect the manager
+    // after registering the state model factory (in which case we cannot use manager's connection),
+    // and some connect the manager before registering the state model factory (in which case we
+    // can use manager's connection). We need to think about the right order and determine if we
+    // want to enforce it, which may cause backward incompatibility.
+    RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+        new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(new ZNRecordSerializer());
+
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+      String clusterName = manager.getClusterName();
+      String shardingKey = HelixUtil.clusterNameToShardingKey(clusterName);
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+          new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+              .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+              .setZkRealmShardingKey(shardingKey).build();
+      try {
+        return new FederatedZkClient(connectionConfig, clientConfig);
+      } catch (InvalidRoutingDataException | IllegalArgumentException e) {
+        throw new HelixException("Failed to create FederatedZkClient!", e);
+      }
+    }
+
+    return SharedZkClientFactory.getInstance().buildZkClient(
+        new HelixZkClient.ZkConnectionConfig(manager.getMetadataStoreConnectionString()),
+        clientConfig.createHelixZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index a5a6086..e8d4571 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -33,18 +33,24 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
 import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.manager.zk.ZKUtil;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.util.RebalanceUtil;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.slf4j.Logger;
@@ -1123,4 +1129,52 @@ public class TaskUtil {
       }
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if InstanceConfig doesn't exist or the
+   * value is undefined, try ClusterConfig; if the value is undefined in ClusterConfig, fall back
+   * to the default value.
+   * @param zkClient - ZooKeeper connection for config reading
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(RealmAwareZkClient zkClient, String clusterName,
+      String instanceName) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+
+    // Check instance config first for thread pool size
+    if (ZKUtil.isInstanceSetup(zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) {
+      InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+        // Reject negative values. The pool size is only negative when it's not set in
+        // InstanceConfig, or when the users bypassed the setter logic in InstanceConfig. We treat
+        // negative values as the value is not set, and continue with ClusterConfig.
+        if (targetTaskThreadPoolSize >= 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    }
+
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    if (clusterConfig != null) {
+      int globalTargetTaskThreadPoolSize = clusterConfig.getGlobalTargetTaskThreadPoolSize();
+      // Reject negative values. The pool size is only negative when it's not set in
+      // ClusterConfig, or when the users bypassed the setter logic in ClusterConfig. We treat
+      // negative values as the value is not set, and continue with the default value.
+      if (globalTargetTaskThreadPoolSize >= 0) {
+        return globalTargetTaskThreadPoolSize;
+      }
+    } else {
+      LOG.warn("Got null as ClusterConfig for cluster {}. Returning default value: {}. ",
+          clusterName, TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+    }
+
+    return TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index 489a0aa..41ab950 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -28,7 +28,7 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,7 +113,7 @@ public class AssignableInstance {
     if (resourceCapacity == null) {
       resourceCapacity = new HashMap<>();
       resourceCapacity.put(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(),
-          Integer.toString(TaskStateModelFactory.TASK_THREADPOOL_SIZE));
+          Integer.toString(_liveInstance.getCurrentTaskThreadPoolSize()));
       logger.debug("No resource capacity provided in LiveInstance {}, assuming default capacity: {}",
           _instanceConfig.getInstanceName(), resourceCapacity);
     }
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 77009e4..7fb5bc1 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -109,6 +109,16 @@ public final class HelixUtil {
     return path.substring(path.lastIndexOf('/') + 1);
   }
 
+  /**
+   * Convert a cluster name to a sharding key for routing purpose by adding a "/" to the front.
+   * Check if the cluster name already has a "/" at the front; if so just return it.
+   * @param clusterName - cluster name
+   * @return the sharding key corresponding the cluster name
+   */
+  public static String clusterNameToShardingKey(String clusterName) {
+    return clusterName.charAt(0) == '/' ? clusterName : "/" + clusterName;
+  }
+
   public static String serializeByComma(List<String> objects) {
     return Joiner.on(",").join(objects);
   }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index 497de17..6118859 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -160,6 +160,8 @@ public class TestMultiZkHelixJavaApis {
     // MSDS endpoint: http://localhost:11117/admin/v2/namespaces/multiZkTest
     System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, _msdsEndpoint);
 
+    // Routing data may be set by other tests using the same endpoint; reset() for good measure
+    RoutingDataManager.getInstance().reset();
     // Create a FederatedZkClient for admin work
     _zkClient =
         new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
index 7fc2014..ddf5440 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
@@ -25,8 +25,8 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
 import org.testng.Assert;
@@ -73,7 +73,8 @@ public class TestTaskThreadLeak extends TaskTestBase {
     int threadCountAfter = getThreadCount("TaskStateModelFactory");
 
     Assert.assertTrue(
-        (threadCountAfter - _threadCountBefore) <= TaskStateModelFactory.TASK_THREADPOOL_SIZE + 1);
+        (threadCountAfter - _threadCountBefore) <= TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE
+            + 1);
   }
 
 
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
index a03f4f2..1ebe747 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
@@ -28,7 +28,10 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -145,6 +148,47 @@ public class TestParticipantManager extends ZkTestBase {
     deleteCluster(clusterName);
   }
 
+  @Test(dependsOnMethods = "testSessionExpiryCreateLiveInstance")
+  public void testCurrentTaskThreadPoolSizeCreation() throws Exception {
+    // Using a pool sized different from the default value to verify correctness
+    final int testThreadPoolSize = TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+    final String className = TestHelper.getTestClassName();
+    final String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    final ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
+        new ZkBaseDataAccessor.Builder<ZNRecord>().setZkAddress(ZK_ADDR).build());
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+
+    final String instanceName = "localhost_12918";
+    final MockParticipantManager manager =
+        new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+
+    InstanceConfig instanceConfig = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
+    instanceConfig.setTargetTaskThreadPoolSize(testThreadPoolSize);
+    accessor.setProperty(keyBuilder.instanceConfig(instanceName), instanceConfig);
+
+    manager.syncStart();
+
+    final LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
+    Assert.assertNotNull(liveInstance);
+    Assert.assertEquals(liveInstance.getCurrentTaskThreadPoolSize(), testThreadPoolSize);
+
+    // Clean up.
+    manager.syncStop();
+    deleteCluster(clusterName);
+  }
+
   /*
    * Mocks PreConnectCallback to insert session expiry during ParticipantManager#handleNewSession()
    */
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index 8ed56b9..6fa4bfa 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -167,7 +167,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
         accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_0"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 0);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 0);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 3);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 4);
 
     manager.disconnect();
 
@@ -180,7 +180,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_1"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 4);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
 
     manager.disconnect();
 
@@ -193,7 +193,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_2"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 6);
     Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
 
@@ -208,7 +208,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_3"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 6);
     Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
     String sessionId = liveInstance.getEphemeralOwner();
@@ -219,7 +219,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_3"));
     Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
     Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
-    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 6);
     Assert.assertFalse(liveInstance.getEphemeralOwner().equals("value"));
     Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
     Assert.assertFalse(sessionId.equals(liveInstance.getEphemeralOwner()));
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index cc3ecf3..cd381dd 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -76,6 +76,30 @@ public class TestClusterConfig {
   }
 
   @Test
+  public void testGetGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.getRecord().setIntField(
+        ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(), 100);
+
+    Assert.assertEquals(testConfig.getGlobalTargetTaskThreadPoolSize(), 100);
+  }
+
+  @Test
+  public void testSetGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalTargetTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testConfig.getRecord().getIntField(
+        ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(), -1), 100);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetGlobalTargetTaskThreadPoolSizeIllegalArgument() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalTargetTaskThreadPoolSize(-1);
+  }
+
+  @Test
   public void testGetRebalancePreference() {
     Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
     preference.put(EVENNESS, 5);
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
index 08390e4..6ec6a4a 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
@@ -122,4 +122,28 @@ public class TestInstanceConfig {
     InstanceConfig testConfig = new InstanceConfig("testConfig");
     testConfig.setInstanceCapacityMap(capacityDataMap);
   }
+
+  @Test
+  public void testGetTargetTaskThreadPoolSize() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.getRecord().setIntField(
+        InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(), 100);
+
+    Assert.assertEquals(testConfig.getTargetTaskThreadPoolSize(), 100);
+  }
+
+  @Test
+  public void testSetTargetTaskThreadPoolSize() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setTargetTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testConfig.getRecord().getIntField(
+        InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(), -1), 100);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetTargetTaskThreadPoolSizeIllegalArgument() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setTargetTaskThreadPoolSize(-1);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java b/helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
similarity index 81%
rename from helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
rename to helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
index c0a6931..39f0a57 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKLiveInstanceData.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestLiveInstance.java
@@ -1,4 +1,4 @@
-package org.apache.helix.manager.zk;
+package org.apache.helix.model;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -35,15 +35,27 @@ import org.apache.helix.LiveInstanceChangeListener;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConstants;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestZKLiveInstanceData extends ZkUnitTestBase {
+public class TestLiveInstance extends ZkUnitTestBase {
   private final String clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
 
+  @BeforeClass()
+  public void beforeClass() throws Exception {
+    _gSetupTool.addCluster(clusterName, true);
+    _gSetupTool
+        .addInstancesToCluster(clusterName, new String[] { "localhost:54321", "localhost:54322" });
+  }
+
+  @AfterClass()
+  public void afterClass() throws Exception {
+    deleteCluster(clusterName);
+  }
+
   @Test
   public void testDataChange() throws Exception {
     // Create an admin and add LiveInstanceChange listener to it
@@ -101,23 +113,6 @@ public class TestZKLiveInstanceData extends ZkUnitTestBase {
     Assert.assertTrue(instances.isEmpty(), "Expecting an empty list of live instance");
 
     adminManager.disconnect();
-
-  }
-
-  @BeforeClass()
-  public void beforeClass() throws Exception {
-    _gSetupTool.addCluster(clusterName, true);
-    _gSetupTool
-        .addInstancesToCluster(clusterName, new String[] { "localhost:54321", "localhost:54322" });
-  }
-
-  @AfterClass()
-  public void afterClass() throws Exception {
-    deleteCluster(clusterName);
-  }
-
-  private String[] getArgs(String... args) {
-    return args;
   }
 
   private List<LiveInstance> deepCopy(List<LiveInstance> instances) {
@@ -127,4 +122,28 @@ public class TestZKLiveInstanceData extends ZkUnitTestBase {
     }
     return result;
   }
+
+  @Test(dependsOnMethods = "testDataChange")
+  public void testGetCurrentTaskThreadPoolSize() {
+    LiveInstance testLiveInstance = new LiveInstance("testId");
+    testLiveInstance.getRecord()
+        .setIntField(LiveInstance.LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(), 100);
+
+    Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(), 100);
+  }
+
+  @Test(dependsOnMethods = "testGetCurrentTaskThreadPoolSize")
+  public void testGetCurrentTaskThreadPoolSizeDefault() {
+    LiveInstance testLiveInstance = new LiveInstance("testId");
+
+    Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(), TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetCurrentTaskThreadPoolSizeDefault")
+  public void testSetCurrentTaskThreadPoolSize() {
+    LiveInstance testLiveInstance = new LiveInstance("testId");
+    testLiveInstance.setCurrentTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testLiveInstance.getCurrentTaskThreadPoolSize(), 100);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskDriver.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskDriver.java
new file mode 100644
index 0000000..7d26b20
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskDriver.java
@@ -0,0 +1,100 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestTaskDriver extends TaskTestBase {
+  // Use a thread pool size that's different from the default value for test
+  private static final int TEST_THREAD_POOL_SIZE = TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+  private static final String NON_EXISTENT_INSTANCE_NAME = "NON_EXISTENT_INSTANCE_NAME";
+
+  private TaskDriver _taskDriver;
+  private ConfigAccessor _configAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+
+    _taskDriver = new TaskDriver(_controller);
+    _configAccessor = _controller.getConfigAccessor();
+  }
+
+  @Test
+  public void testSetTargetTaskThreadPoolSize() {
+    String validInstanceName = _participants[0].getInstanceName();
+    _taskDriver.setTargetTaskThreadPoolSize(validInstanceName, TEST_THREAD_POOL_SIZE);
+    InstanceConfig instanceConfig =
+        _configAccessor.getInstanceConfig(CLUSTER_NAME, validInstanceName);
+
+    Assert.assertEquals(instanceConfig.getTargetTaskThreadPoolSize(), TEST_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testSetTargetTaskThreadPoolSize", expectedExceptions = IllegalArgumentException.class)
+  public void testSetTargetTaskThreadPoolSizeWrongInstanceName() {
+    _taskDriver.setTargetTaskThreadPoolSize(NON_EXISTENT_INSTANCE_NAME, TEST_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testSetTargetTaskThreadPoolSizeWrongInstanceName")
+  public void testGetTargetTaskThreadPoolSize() {
+    String validInstanceName = _participants[0].getInstanceName();
+
+    Assert.assertEquals(_taskDriver.getTargetTaskThreadPoolSize(validInstanceName),
+        TEST_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTargetTaskThreadPoolSize", expectedExceptions = IllegalArgumentException.class)
+  public void testGetTargetTaskThreadPoolSizeWrongInstanceName() {
+    _taskDriver.getTargetTaskThreadPoolSize(NON_EXISTENT_INSTANCE_NAME);
+  }
+
+  @Test(dependsOnMethods = "testGetTargetTaskThreadPoolSizeWrongInstanceName")
+  public void testSetGlobalTargetTaskThreadPoolSize() {
+    _taskDriver.setGlobalTargetTaskThreadPoolSize(TEST_THREAD_POOL_SIZE);
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+
+    Assert.assertEquals(clusterConfig.getGlobalTargetTaskThreadPoolSize(), TEST_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testSetGlobalTargetTaskThreadPoolSize")
+  public void testGetGlobalTargetTaskThreadPoolSize() {
+    Assert.assertEquals(_taskDriver.getGlobalTargetTaskThreadPoolSize(), TEST_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetGlobalTargetTaskThreadPoolSize")
+  public void testGetCurrentTaskThreadPoolSize() {
+    String validInstanceName = _participants[0].getInstanceName();
+
+    Assert.assertEquals(_taskDriver.getCurrentTaskThreadPoolSize(validInstanceName),
+        TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetCurrentTaskThreadPoolSize", expectedExceptions = IllegalArgumentException.class)
+  public void testGetCurrentTaskThreadPoolSizeWrongInstanceName() {
+    _taskDriver.getCurrentTaskThreadPoolSize(NON_EXISTENT_INSTANCE_NAME);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
new file mode 100644
index 0000000..3919cde
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
@@ -0,0 +1,124 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.routing.RoutingDataManager;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskStateModelFactory extends TaskTestBase {
+  // This value has to be different from the default value to verify correctness
+  private static final int TEST_TARGET_TASK_THREAD_POOL_SIZE =
+      TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+  @Test
+  public void testConfigAccessorCreationMultiZk() throws Exception {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    instanceConfig.setTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    // Start a msds server
+    // TODO: Refactor all MSDS_SERVER_ENDPOINT creation in system property to one place.
+    // Any test that modifies MSDS_SERVER_ENDPOINT system property and accesses
+    // HttpRoutingDataReader (ex. TestMultiZkHelixJavaApis and this test) will cause the
+    // MSDS_SERVER_ENDPOINT system property to be recorded as final in HttpRoutingDataReader; that
+    // means any test class that satisfies the aforementioned condition and is executed first gets
+    // to "decide" the default msds endpoint. The only workaround is for all these test classes to
+    // use the same default msds endpoint.
+    final String msdsHostName = "localhost";
+    final int msdsPort = 11117;
+    final String msdsNamespace = "multiZkTest";
+    Map<String, Collection<String>> routingData = new HashMap<>();
+    routingData
+        .put(ZK_ADDR, Collections.singletonList("/" + anyParticipantManager.getClusterName()));
+    MockMetadataStoreDirectoryServer msds =
+        new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort, msdsNamespace, routingData);
+    msds.startServer();
+
+    // Save previously-set system configs
+    String prevMultiZkEnabled = System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    String prevMsdsServerEndpoint =
+        System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+    // Turn on multiZk mode in System config
+    System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
+    // MSDS endpoint: http://localhost:11117/admin/v2/namespaces/testTaskStateModelFactory
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
+        "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace);
+
+    RoutingDataManager.getInstance().reset();
+    RealmAwareZkClient zkClient = TaskStateModelFactory.createZkClient(anyParticipantManager);
+    Assert.assertEquals(TaskUtil
+        .getTargetThreadPoolSize(zkClient, anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName()), TEST_TARGET_TASK_THREAD_POOL_SIZE);
+
+    // Restore system properties
+    if (prevMultiZkEnabled == null) {
+      System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    } else {
+      System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, prevMultiZkEnabled);
+    }
+    if (prevMsdsServerEndpoint == null) {
+      System.clearProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY);
+    } else {
+      System.setProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY, prevMsdsServerEndpoint);
+    }
+    msds.stopServer();
+  }
+
+  @Test(dependsOnMethods = "testConfigAccessorCreationMultiZk")
+  public void testConfigAccessorCreationSingleZk() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    // Save previously-set system configs
+    String prevMultiZkEnabled = System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    // Turn off multiZk mode in System config
+    System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "false");
+
+    RealmAwareZkClient zkClient = TaskStateModelFactory.createZkClient(anyParticipantManager);
+    Assert.assertEquals(TaskUtil
+        .getTargetThreadPoolSize(zkClient, anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName()), TEST_TARGET_TASK_THREAD_POOL_SIZE);
+
+    // Restore system properties
+    if (prevMultiZkEnabled == null) {
+      System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    } else {
+      System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, prevMultiZkEnabled);
+    }
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
index e04ef75..a85ba67 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -24,9 +24,14 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.task.TaskTestBase;
 import org.apache.helix.integration.task.TaskTestUtil;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -35,6 +40,9 @@ import static org.mockito.Mockito.when;
 
 
 public class TestTaskUtil extends TaskTestBase {
+  // This value has to be different from the default value to verify correctness
+  private static final int TEST_TARGET_TASK_THREAD_POOL_SIZE =
+      TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
 
   @Test
   public void testGetExpiredJobsFromCache() {
@@ -182,4 +190,86 @@ public class TestTaskUtil extends TaskTestBase {
         .getExpiredJobsFromCache(workflowControllerDataProvider, workflow.getWorkflowConfig(),
             workflowContext), expectedJobs);
   }
+
+  @Test
+  public void testGetTaskThreadPoolSize() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    instanceConfig.setTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    ClusterConfig clusterConfig = new ClusterConfig(anyParticipantManager.getClusterName());
+    clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE + 1);
+    anyParticipantManager.getConfigAccessor()
+        .setClusterConfig(anyParticipantManager.getClusterName(), clusterConfig);
+
+    Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), anyParticipantManager.getInstanceName()),
+        TEST_TARGET_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTaskThreadPoolSize")
+  public void testGetTaskThreadPoolSizeInstanceConfigUndefined() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    ClusterConfig clusterConfig = new ClusterConfig(anyParticipantManager.getClusterName());
+    clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setClusterConfig(anyParticipantManager.getClusterName(), clusterConfig);
+
+    Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), anyParticipantManager.getInstanceName()),
+        TEST_TARGET_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTaskThreadPoolSizeInstanceConfigUndefined")
+  public void testGetTaskThreadPoolSizeInstanceConfigDoesNotExist() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    HelixDataAccessor helixDataAccessor = anyParticipantManager.getHelixDataAccessor();
+    helixDataAccessor.removeProperty(
+        helixDataAccessor.keyBuilder().instanceConfig(anyParticipantManager.getInstanceName()));
+
+    ClusterConfig clusterConfig = new ClusterConfig(anyParticipantManager.getClusterName());
+    clusterConfig.setGlobalTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setClusterConfig(anyParticipantManager.getClusterName(), clusterConfig);
+
+    Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), anyParticipantManager.getInstanceName()),
+        TEST_TARGET_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTaskThreadPoolSizeInstanceConfigDoesNotExist")
+  public void testGetTaskThreadPoolSizeClusterConfigUndefined() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    ClusterConfig clusterConfig = new ClusterConfig(anyParticipantManager.getClusterName());
+    anyParticipantManager.getConfigAccessor()
+        .setClusterConfig(anyParticipantManager.getClusterName(), clusterConfig);
+
+    Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), anyParticipantManager.getInstanceName()),
+        TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+  }
+
+  @Test(dependsOnMethods = "testGetTaskThreadPoolSizeClusterConfigUndefined", expectedExceptions = HelixException.class)
+  public void testGetTaskThreadPoolSizeClusterConfigDoesNotExist() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    HelixDataAccessor helixDataAccessor = anyParticipantManager.getHelixDataAccessor();
+    helixDataAccessor.removeProperty(helixDataAccessor.keyBuilder().clusterConfig());
+    TaskUtil.getTargetThreadPoolSize(anyParticipantManager.getZkClient(),
+        anyParticipantManager.getClusterName(), anyParticipantManager.getInstanceName());
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index e62bf81a..a451b8c 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -26,7 +26,7 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskConstants;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
@@ -55,18 +55,20 @@ public class TestAssignableInstance extends AssignerTestBase {
 
   @Test
   public void testInitializationWithQuotaUnset() {
+    int expectedCurrentTaskThreadPoolSize = 100;
+    LiveInstance liveInstance = createLiveInstance(null, null);
+    liveInstance.setCurrentTaskThreadPoolSize(expectedCurrentTaskThreadPoolSize);
+
     // Initialize AssignableInstance with neither resource capacity nor quota ratio provided
     AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, false),
-        new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+        new InstanceConfig(testInstanceName), liveInstance);
     Assert.assertEquals(ai.getUsedCapacity().size(), 1);
     Assert.assertEquals(
         (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
-            .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
-        0);
+            .get(AssignableInstance.DEFAULT_QUOTA_TYPE), 0);
     Assert.assertEquals(
         (int) ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
-            .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
-        TaskStateModelFactory.TASK_THREADPOOL_SIZE);
+            .get(AssignableInstance.DEFAULT_QUOTA_TYPE), expectedCurrentTaskThreadPoolSize);
     Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
   }
 
@@ -91,10 +93,14 @@ public class TestAssignableInstance extends AssignerTestBase {
 
   @Test
   public void testInitializationWithOnlyQuotaType() {
+    int expectedCurrentTaskThreadPoolSize = 100;
+    LiveInstance liveInstance = createLiveInstance(null, null);
+    liveInstance.setCurrentTaskThreadPoolSize(expectedCurrentTaskThreadPoolSize);
+
     // Initialize AssignableInstance with only quota type provided
     AssignableInstance ai =
         new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
-            new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+            new InstanceConfig(testInstanceName), liveInstance);
 
     Assert.assertEquals(ai.getTotalCapacity().size(), 1);
     Assert.assertEquals(ai.getUsedCapacity().size(), 1);
@@ -106,7 +112,7 @@ public class TestAssignableInstance extends AssignerTestBase {
         testQuotaTypes.length);
     Assert.assertEquals(
         ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()),
-        calculateExpectedQuotaPerType(TaskStateModelFactory.TASK_THREADPOOL_SIZE, testQuotaTypes,
+        calculateExpectedQuotaPerType(expectedCurrentTaskThreadPoolSize, testQuotaTypes,
             testQuotaRatio));
     Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
   }
@@ -171,12 +177,16 @@ public class TestAssignableInstance extends AssignerTestBase {
 
   @Test
   public void testNormalTryAssign() {
+    int testCurrentTaskThreadPoolSize = 100;
+    LiveInstance liveInstance = createLiveInstance(null, null);
+    liveInstance.setCurrentTaskThreadPoolSize(testCurrentTaskThreadPoolSize);
+
     AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, true),
-        new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+        new InstanceConfig(testInstanceName), liveInstance);
 
     // When nothing is configured, we should use default quota type to assign
     Map<String, TaskAssignResult> results = new HashMap<>();
-    for (int i = 0; i < TaskStateModelFactory.TASK_THREADPOOL_SIZE; i++) {
+    for (int i = 0; i < testCurrentTaskThreadPoolSize; i++) {
       String taskId = Integer.toString(i);
       TaskConfig task = new TaskConfig("", null, taskId, null);
       TaskAssignResult result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);