You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/12 21:24:09 UTC

helix git commit: [HELIX-729] Modify APIs and TaskDataCache so that it has AssignableInstanceManager

Repository: helix
Updated Branches:
  refs/heads/master b654c543c -> 304a8f863


[HELIX-729] Modify APIs and TaskDataCache so that it has AssignableInstanceManager

For scheduling based on quota type support, integration with the currently existing pipeline requires access of the most recent AssignableInstanceManager at the cluster cache level. AssignableInstanceManager was added to TaskDataCache and an API was added for easy access.
TaskConfig's getQuotaType() API was removed. That means that all components that rely on that API must have the quota type supplied as a parameter as well.
Modified APIs for AssignableInstance such that quotaType is an explicit argument

Changelist:
1. AssignableInstanceManager was added to TaskDataCache and an API was added for easy access
2. TaskAssigner has another API that explicitly has quota type as a parameter
3. ThreadCountBasedTaskAssigner implements the new interface API
4. AssignableInstance's APIs for assigning and releasing tasks now explicitly require quotaType as a parameter
5. TaskAssignResult also stores quotaType
6. Tests modified so that quotaType is supplied to assign and release calls
7. A setter for quota type was removed to comply with the definition that quota types only exist down to the job level for TaskConfig
8. Updated JavaDoc


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/304a8f86
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/304a8f86
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/304a8f86

Branch: refs/heads/master
Commit: 304a8f8633676c1eb0879637cae33b96bf7754cb
Parents: b654c54
Author: Hunter Lee <na...@gmail.com>
Authored: Thu Jul 12 11:43:45 2018 -0700
Committer: Hunter Lee <na...@gmail.com>
Committed: Thu Jul 12 14:23:03 2018 -0700

----------------------------------------------------------------------
 .../helix/common/caches/TaskDataCache.java      |  49 ++++-
 .../controller/stages/ClusterDataCache.java     | 108 +++++++----
 .../helix/task/AssignableInstanceManager.java   |   9 +-
 .../java/org/apache/helix/task/TaskConfig.java  |  26 +--
 .../helix/task/assigner/AssignableInstance.java | 102 +++++------
 .../helix/task/assigner/TaskAssignResult.java   |  20 ++-
 .../helix/task/assigner/TaskAssigner.java       |  23 ++-
 .../assigner/ThreadCountBasedTaskAssigner.java  |  60 ++++---
 .../task/TestAssignableInstanceManager.java     |   1 -
 ...signableInstanceManagerControllerSwitch.java |   4 +-
 .../helix/task/assigner/AssignerTestBase.java   |   7 +-
 .../task/assigner/TestAssignableInstance.java   | 180 ++++++++++---------
 .../TestThreadCountBasedTaskAssigner.java       |  31 ++--
 13 files changed, 364 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index bc01d39..478b03a 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -28,6 +28,9 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.task.AssignableInstanceManager;
 import org.apache.helix.task.JobConfig;
@@ -35,13 +38,14 @@ import org.apache.helix.task.JobContext;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.controller.LogUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Cache for holding all task related cluster data, such as WorkflowConfig, JobConfig and Contexts.
  */
-public class TaskDataCache {
+public class TaskDataCache extends AbstractDataCache {
   private static final Logger LOG = LoggerFactory.getLogger(TaskDataCache.class.getName());
   private static final String NAME = "NAME";
 
@@ -49,7 +53,13 @@ public class TaskDataCache {
   private Map<String, JobConfig> _jobConfigMap = new HashMap<>();
   private Map<String, WorkflowConfig> _workflowConfigMap = new HashMap<>();
   private Map<String, ZNRecord> _contextMap = new HashMap<>();
+  // The following fields have been added for quota-based task scheduling
+  private final AssignableInstanceManager _assignableInstanceManager = new AssignableInstanceManager();
 
+  /**
+   * Original constructor for TaskDataCache.
+   * @param clusterName
+   */
   public TaskDataCache(String clusterName) {
     _clusterName = clusterName;
   }
@@ -79,6 +89,28 @@ public class TaskDataCache {
     return true;
   }
 
+  /**
+   * Refreshes Task Framework contexts and configs from ZooKeeper. This method also re-instantiates
+   * AssignableInstanceManager.
+   * @param accessor
+   * @param resourceConfigMap
+   * @param liveInstanceMap
+   * @param instanceConfigMap
+   * @return
+   */
+  public synchronized boolean refresh(HelixDataAccessor accessor,
+      Map<String, ResourceConfig> resourceConfigMap, ClusterConfig clusterConfig,
+      Map<String, LiveInstance> liveInstanceMap, Map<String, InstanceConfig> instanceConfigMap) {
+    // First, call the original refresh for contexts and configs
+    if (refresh(accessor, resourceConfigMap)) {
+      // Upon refresh success, re-instantiate AssignableInstanceManager from scratch
+      _assignableInstanceManager.buildAssignableInstances(clusterConfig, this, liveInstanceMap,
+          instanceConfigMap);
+      return true;
+    }
+    return false;
+  }
+
   private void refreshJobContexts(HelixDataAccessor accessor) {
     // TODO: Need an optimize for reading context only if the refresh is needed.
     long start = System.currentTimeMillis();
@@ -104,14 +136,15 @@ public class TaskDataCache {
         _contextMap.put(context.getSimpleField(NAME), context);
       } else {
         _contextMap.put(childNames.get(i), context);
-        LOG.debug(
+        LogUtil.logDebug(LOG, getEventId(),
             String.format("Context for %s is null or miss the context NAME!", childNames.get((i))));
       }
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("# of workflow/job context read from zk: " + _contextMap.size() + ". Take "
-          + (System.currentTimeMillis() - start) + " ms");
+      LogUtil.logDebug(LOG, getEventId(),
+          "# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + (
+              System.currentTimeMillis() - start) + " ms");
     }
   }
 
@@ -207,6 +240,14 @@ public class TaskDataCache {
     return _contextMap;
   }
 
+  /**
+   * Returns the current AssignableInstanceManager instance.
+   * @return
+   */
+  public AssignableInstanceManager getAssignableInstanceManager() {
+    return _assignableInstanceManager;
+  }
+
   @Override
   public String toString() {
     return "TaskDataCache{"

http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 87a3a10..ca2fa76 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -36,6 +36,7 @@ import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.common.caches.IdealStateCache;
 import org.apache.helix.common.caches.InstanceMessagesCache;
 import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -50,6 +51,7 @@ import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.AssignableInstanceManager;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.TaskConstants;
@@ -85,6 +87,7 @@ public class ClusterDataCache {
   private Map<String, ExternalView> _externalViewMap = new HashMap<>();
   private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
   private Set<String> _disabledInstanceSet = new HashSet<>();
+  private String _eventId = "NO_ID";
 
   private IdealStateCache _idealStateCache;
   private CurrentStateCache _currentStateCache;
@@ -142,8 +145,11 @@ public class ClusterDataCache {
       _propertyDataChangedMap.put(ChangeType.IDEAL_STATE, false);
       clearCachedResourceAssignments();
       _idealStateCache.refresh(accessor);
-      LOG.info("Refresh IdealStates for cluster " + _clusterName + ", took " + (
-          System.currentTimeMillis() - startTime) + " ms");
+      LogUtil.logInfo(LOG, _eventId,
+          "Refresh IdealStates for cluster " + _clusterName + ", took " + (
+              System.currentTimeMillis() - startTime) + " ms for " + (_isTaskCache
+              ? "TASK"
+              : "DEFAULT") + "pipeline");
     }
 
     if (_propertyDataChangedMap.get(ChangeType.LIVE_INSTANCE)) {
@@ -152,15 +158,21 @@ public class ClusterDataCache {
       clearCachedResourceAssignments();
       _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances(), true);
       _updateInstanceOfflineTime = true;
-      LOG.info("Refresh LiveInstances for cluster " + _clusterName + ", took " + (
-          System.currentTimeMillis() - startTime) + " ms");
+      LogUtil.logInfo(LOG, _eventId,
+          "Refresh LiveInstances for cluster " + _clusterName + ", took " + (
+              System.currentTimeMillis() - startTime) + " ms for " + (_isTaskCache
+              ? "TASK"
+              : "DEFAULT") + "pipeline");
     }
 
     if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) {
       _propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, false);
       clearCachedResourceAssignments();
       _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
-      LOG.info("Reload InstanceConfig: " + _instanceConfigCacheMap.keySet());
+      LogUtil.logInfo(LOG, _eventId,
+          "Reload InstanceConfig: " + _instanceConfigCacheMap.keySet() + " for " + (_isTaskCache
+              ? "TASK"
+              : "DEFAULT") + "pipeline");
     }
 
     if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) {
@@ -168,12 +180,15 @@ public class ClusterDataCache {
       clearCachedResourceAssignments();
       _resourceConfigCacheMap =
           accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs(), true);
-      LOG.info("Reload ResourceConfigs: " + _resourceConfigCacheMap.keySet());
+      LogUtil.logInfo(LOG, _eventId,
+          "Reload ResourceConfigs: " + _resourceConfigCacheMap.keySet() + " for " + (_isTaskCache
+              ? "TASK"
+              : "DEFAULT") + "pipeline");
 
     }
 
     _liveInstanceMap = new HashMap<>(_liveInstanceCacheMap);
-    _liveInstanceMap = new HashMap(_liveInstanceCacheMap);
+    _liveInstanceMap = new HashMap<>(_liveInstanceCacheMap);
     _instanceConfigMap = new ConcurrentHashMap<>(_instanceConfigCacheMap);
     _resourceConfigMap = new HashMap<>(_resourceConfigCacheMap);
 
@@ -181,31 +196,30 @@ public class ClusterDataCache {
       updateOfflineInstanceHistory(accessor);
     }
 
-    if (_isTaskCache) {
-      _taskDataCache.refresh(accessor, _resourceConfigMap);
-    }
-
     Map<String, StateModelDefinition> stateDefMap =
         accessor.getChildValuesMap(keyBuilder.stateModelDefs(), true);
     _stateModelDefMap = new ConcurrentHashMap<>(stateDefMap);
     _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints(), true);
     _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
 
+    if (_isTaskCache) {
+      _taskDataCache.refresh(accessor, _resourceConfigMap, _clusterConfig, _liveInstanceMap,
+          _instanceConfigMap);
+    }
 
-    _instanceMessagesCache
-        .refresh(accessor, _liveInstanceMap);
+    _instanceMessagesCache.refresh(accessor, _liveInstanceMap);
     _currentStateCache.refresh(accessor, _liveInstanceMap);
 
     // current state must be refreshed before refreshing relay messages
     // because we need to use current state to validate all relay messages.
-    _instanceMessagesCache
-        .updateRelayMessages(_liveInstanceMap, _currentStateCache.getCurrentStatesMap());
+    _instanceMessagesCache.updateRelayMessages(_liveInstanceMap, _currentStateCache.getCurrentStatesMap());
 
     if (_clusterConfig != null) {
       _idealStateRuleMap = _clusterConfig.getIdealStateRules();
     } else {
       _idealStateRuleMap = new HashMap<>();
-      LOG.warn("Cluster config is null!");
+      LogUtil.logWarn(LOG, _eventId,
+          "Cluster config is null for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
     }
 
     MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance());
@@ -214,22 +228,27 @@ public class ClusterDataCache {
     updateDisabledInstances();
 
     long endTime = System.currentTimeMillis();
-    LOG.info(
+    LogUtil.logInfo(LOG, _eventId,
         "END: ClusterDataCache.refresh() for cluster " + getClusterName() + ", took " + (endTime
-            - startTime) + " ms");
+            - startTime) + " ms for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("# of StateModelDefinition read from zk: " + _stateModelDefMap.size());
-      LOG.debug("# of ConstraintMap read from zk: " + _constraintMap.size());
-      LOG.debug("LiveInstances: " + _liveInstanceMap.keySet());
+      LogUtil.logDebug(LOG, _eventId,
+          "# of StateModelDefinition read from zk: " + _stateModelDefMap.size());
+      LogUtil
+          .logDebug(LOG, _eventId, "# of ConstraintMap read from zk: " + _constraintMap.size());
+      LogUtil.logDebug(LOG, _eventId, "LiveInstances: " + _liveInstanceMap.keySet());
       for (LiveInstance instance : _liveInstanceMap.values()) {
-        LOG.debug("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
+        LogUtil.logDebug(LOG, _eventId,
+            "live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
       }
-      LOG.debug("IdealStates: " + _idealStateCache.getIdealStateMap().keySet());
-      LOG.debug("ResourceConfigs: " + _resourceConfigMap.keySet());
-      LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet());
-      LOG.debug("ClusterConfigs: " + _clusterConfig);
-      LOG.debug("JobContexts: " + _taskDataCache.getContexts().keySet());
+      LogUtil
+          .logDebug(LOG, _eventId, "IdealStates: " + _idealStateCache.getIdealStateMap().keySet());
+      LogUtil.logDebug(LOG, _eventId, "ResourceConfigs: " + _resourceConfigMap.keySet());
+      LogUtil.logDebug(LOG, _eventId, "InstanceConfigs: " + _instanceConfigMap.keySet());
+      LogUtil.logDebug(LOG, _eventId, "ClusterConfigs: " + _clusterConfig);
+      LogUtil
+          .logDebug(LOG, _eventId, "JobContexts: " + _taskDataCache.getContexts().keySet());
     }
 
     if (LOG.isTraceEnabled()) {
@@ -282,7 +301,8 @@ public class ClusterDataCache {
         history.reportOffline();
         // persist history back to ZK.
         if (!accessor.setProperty(propertyKey, history)) {
-          LOG.error("Fails to persist participant online history back to ZK!");
+          LogUtil
+              .logError(LOG, _eventId, "Fails to persist participant online history back to ZK!");
         }
       }
       _instanceOfflineTimeMap.put(instance, history.getLastOfflineTime());
@@ -320,7 +340,7 @@ public class ClusterDataCache {
   }
 
   public synchronized void setIdealStates(List<IdealState> idealStates) {
-   _idealStateCache.setIdealStates(idealStates);
+    _idealStateCache.setIdealStates(idealStates);
   }
 
   public Map<String, Map<String, String>> getIdealStateRules() {
@@ -601,11 +621,18 @@ public class ClusterDataCache {
           try {
             replicas = Integer.parseInt(replicasStr);
           } catch (Exception e) {
-            LOG.error("invalid replicas string: " + replicasStr);
+            LogUtil.logError(LOG, _eventId,
+                "invalid replicas string: " + replicasStr + " for " + (_isTaskCache
+                    ? "TASK"
+                    : "DEFAULT") + "pipeline");
           }
         }
       } else {
-        LOG.error("idealState for resource: " + resourceName + " does NOT have replicas");
+        LogUtil.logError(LOG, _eventId,
+            "idealState for resource: " + resourceName + " does NOT have replicas for " + (
+                _isTaskCache
+                    ? "TASK"
+                    : "DEFAULT") + "pipeline");
       }
     }
     return replicas;
@@ -708,6 +735,14 @@ public class ClusterDataCache {
     return _taskDataCache.getContexts();
   }
 
+  /**
+   * Returns AssignableInstanceManager.
+   * @return
+   */
+  public AssignableInstanceManager getAssignableInstanceManager() {
+    return _taskDataCache.getAssignableInstanceManager();
+  }
+
   public ExternalView getTargetExternalView(String resourceName) {
     return _targetExternalViewMap.get(resourceName);
   }
@@ -867,6 +902,17 @@ public class ClusterDataCache {
     _lastTopStateLocationMap.clear();
   }
 
+  public String getEventId() {
+    return _eventId;
+  }
+
+  public void setEventId(String eventId) {
+    _eventId = eventId;
+    _idealStateCache.setEventId(eventId);
+    _currentStateCache.setEventId(eventId);
+    _taskDataCache.setEventId(eventId);
+  }
+
   /**
    * toString method to print the entire cluster state
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index 829aa72..f2f0129 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -44,7 +44,8 @@ public class AssignableInstanceManager {
   private boolean _hasBeenBuilt; // Flag for whether AssignableInstances have been built
 
   /**
-   * Basic constructor for AssignableInstanceManager to allow an empty instantiation. buildAssignableInstances() must be explicitly called after instantiation.
+   * Basic constructor for AssignableInstanceManager to allow an empty instantiation.
+   * buildAssignableInstances() must be explicitly called after instantiation.
    */
   public AssignableInstanceManager() {
     _assignableInstanceMap = new HashMap<>();
@@ -96,6 +97,7 @@ public class AssignableInstanceManager {
             jobName, jobConfig, jobContext);
         continue; // Ignore this job if either the config or context is null
       }
+      String quotaType = jobConfig.getQuotaType();
       Set<Integer> taskIndices = jobContext.getPartitionSet(); // Each integer represents a task in
       // this job (this is NOT taskId)
       for (int taskIndex : taskIndices) {
@@ -118,7 +120,7 @@ public class AssignableInstanceManager {
             TaskConfig taskConfig = jobConfig.getTaskConfig(taskId);
             AssignableInstance assignableInstance = _assignableInstanceMap.get(assignedInstance);
             TaskAssignResult taskAssignResult =
-                assignableInstance.restoreTaskAssignResult(taskId, taskConfig);
+                assignableInstance.restoreTaskAssignResult(taskId, taskConfig, quotaType);
             if (taskAssignResult.isSuccessful()) {
               _taskAssignResultMap.put(taskId, taskAssignResult);
               LOG.info("TaskAssignResult restored for taskId: {}, assigned on instance: {}", taskId,
@@ -134,7 +136,8 @@ public class AssignableInstanceManager {
         }
       }
     }
-    _hasBeenBuilt = true; // Set the flag so that it's not re-building from cache every pipeline iteration
+    _hasBeenBuilt = true; // Set the flag so that it's not re-building from cache every pipeline
+    // iteration
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index cb963a8..f080f6d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -32,18 +32,16 @@ import org.slf4j.LoggerFactory;
  * Configuration for an individual task to be run as part of a job.
  */
 public class TaskConfig {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskConfig.class);
+
   private enum TaskConfigProperty {
     TASK_ID,
     TASK_COMMAND,
     @Deprecated
     TASK_SUCCESS_OPTIONAL,
-    TASK_TARGET_PARTITION,
-    TASK_QUOTA_TYPE //TODO: remove
+    TASK_TARGET_PARTITION
   }
 
-  private static final Logger LOG = LoggerFactory.getLogger(TaskConfig.class);
-  public static final String DEFAULT_QUOTA_TYPE = "DEFAULT"; //TODO: remove!
-
   private final Map<String, String> _configMap;
 
   @Deprecated
@@ -232,22 +230,4 @@ public class TaskConfig {
       return new TaskConfig(command, rawConfigMap, taskId, targetPartition);
     }
   }
-
-  //TODO: remove the following
-  /**
-   * Set the quota type of this task
-   * @param quotaType
-   */
-  public void setQuotaType(String quotaType) {
-    _configMap.put(TaskConfigProperty.TASK_QUOTA_TYPE.name(), quotaType);
-  }
-
-  /**
-   * Return the quota type of this task
-   * @return
-   */
-  public String getQuotaType() {
-    return _configMap.containsKey(TaskConfigProperty.TASK_QUOTA_TYPE.name()) ?
-        _configMap.get(TaskConfigProperty.TASK_QUOTA_TYPE.name()) : DEFAULT_QUOTA_TYPE;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index b56b3b9..4e51f80 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
  */
 public class AssignableInstance {
   private static final Logger logger = LoggerFactory.getLogger(AssignableInstance.class);
+  public static final String DEFAULT_QUOTA_TYPE = "DEFAULT";
 
   /**
    * Fitness score will be calculated from 0 to 1000
@@ -68,13 +69,14 @@ public class AssignableInstance {
   public AssignableInstance(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
       LiveInstance liveInstance) {
     if (clusterConfig == null || instanceConfig == null || liveInstance == null) {
-      throw new IllegalArgumentException(
-          "ClusterConfig, InstanceConfig, LiveInstance cannot be null!");
+      throw new IllegalArgumentException(String.format(
+          "ClusterConfig, InstanceConfig, LiveInstance cannot be null! ClusterConfig null: %s, InstanceConfig null: %s, LiveInstance null: %s",
+          clusterConfig == null, instanceConfig == null, liveInstance == null));
     }
 
     if (!instanceConfig.getInstanceName().equals(liveInstance.getInstanceName())) {
-      throw new IllegalArgumentException(String
-          .format("Instance name from LiveInstance (%s) and InstanceConfig (%s) don't match!",
+      throw new IllegalArgumentException(
+          String.format("Instance name from LiveInstance (%s) and InstanceConfig (%s) don't match!",
               liveInstance.getInstanceName(), instanceConfig.getInstanceName()));
     }
     _clusterConfig = clusterConfig;
@@ -90,15 +92,15 @@ public class AssignableInstance {
   /**
    * When task quota ratio / instance's resource capacity change, we need to update instance
    * capacity cache. Couple of corner cases to clarify for updating capacity:
-   *    1. User shrinks capacity and used capacity exceeds total capacity - current assignment
-   *       will not be affected (used > total is ok) but no further assignment decision will
-   *       be made on this instance until spaces get freed up
-   *    2. User removed a quotaType but there are still tasks with stale quota type assigned on
-   *       this instance - current assignment will not be affected, and further assignment will
-   *       NOT be made for stale quota type
-   *    3. User removed a resourceType but there are still tasks with stale resource type assigned
-   *       on this instance - current assignment will not be affected, but no further assignment
-   *       with stale resource type request will be allowed on this instance
+   * 1. User shrinks capacity and used capacity exceeds total capacity - current assignment
+   * will not be affected (used > total is ok) but no further assignment decision will
+   * be made on this instance until spaces get freed up
+   * 2. User removed a quotaType but there are still tasks with stale quota type assigned on
+   * this instance - current assignment will not be affected, and further assignment will
+   * NOT be made for stale quota type
+   * 3. User removed a resourceType but there are still tasks with stale resource type assigned
+   * on this instance - current assignment will not be affected, but no further assignment
+   * with stale resource type request will be allowed on this instance
    */
   private void refreshTotalCapacity() {
     // Create a temp total capacity record in case we fail to parse configurations, we
@@ -117,7 +119,7 @@ public class AssignableInstance {
 
     if (typeQuotaRatio == null) {
       typeQuotaRatio = new HashMap<>();
-      typeQuotaRatio.put(TaskConfig.DEFAULT_QUOTA_TYPE, Integer.toString(1));
+      typeQuotaRatio.put(DEFAULT_QUOTA_TYPE, Integer.toString(1));
       logger.info("No quota type ratio provided in LiveInstance {}, assuming default ratio: {}",
           _instanceConfig.getInstanceName(), typeQuotaRatio);
     }
@@ -148,7 +150,7 @@ public class AssignableInstance {
           // Calculate total quota for a given type
           String quotaType = typeQuotaEntry.getKey();
           int quotaRatio = Integer.valueOf(typeQuotaEntry.getValue());
-          int quota = Math.round(capacity * (float)quotaRatio / (float)totalRatio);
+          int quota = Math.round(capacity * (float) quotaRatio / (float) totalRatio);
 
           // Honor non-zero quota ratio for non-zero capacity even if it is rounded to zero
           if (capacity != 0 && quotaRatio != 0 && quota == 0) {
@@ -239,23 +241,23 @@ public class AssignableInstance {
   /**
    * Tries to assign the given task on this instance and returns TaskAssignResult. Instance capacity
    * profile is NOT modified by tryAssign.
-   *
    * When calculating fitness of an assignment, this function will rate assignment from 0 to 1000,
    * and the assignment that has a higher score will be a better fit.
-   *
    * @param task task config
+   * @param quotaType quota type of the task
    * @return TaskAssignResult
    * @throws IllegalArgumentException if task is null
    */
-  public TaskAssignResult tryAssign(TaskConfig task) throws IllegalArgumentException {
+  public TaskAssignResult tryAssign(TaskConfig task, String quotaType)
+      throws IllegalArgumentException {
     if (task == null) {
       throw new IllegalArgumentException("Task is null!");
     }
 
     if (_currentAssignments.contains(task.getId())) {
-      return new TaskAssignResult(task, this, false, 0,
-          TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED, String
-          .format("Task %s is already assigned to this instance. Need to release it first",
+      return new TaskAssignResult(task, quotaType, this, false, 0,
+          TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED,
+          String.format("Task %s is already assigned to this instance. Need to release it first",
               task.getId()));
     }
 
@@ -264,47 +266,44 @@ public class AssignableInstance {
 
     // Fail when no such resource type
     if (!_totalCapacity.containsKey(resourceType)) {
-      return new TaskAssignResult(task, this, false, 0,
-          TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE, String
-          .format("Requested resource type %s not supported. Available resource types: %s",
+      return new TaskAssignResult(task, quotaType, this, false, 0,
+          TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE,
+          String.format("Requested resource type %s not supported. Available resource types: %s",
               resourceType, _totalCapacity.keySet()));
     }
 
-    String quotaType = task.getQuotaType();
-
     // Fail when no such quota type
     if (!_totalCapacity.get(resourceType).containsKey(quotaType)) {
-      return new TaskAssignResult(task, this, false, 0,
-          TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE, String
-          .format("Requested quota type %s not defined. Available quota types: %s", quotaType,
+      return new TaskAssignResult(task, quotaType, this, false, 0,
+          TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE,
+          String.format("Requested quota type %s not defined. Available quota types: %s", quotaType,
               _totalCapacity.get(resourceType).keySet()));
     }
 
     int capacity = _totalCapacity.get(resourceType).get(quotaType);
-    int usage =  _usedCapacity.get(resourceType).get(quotaType);
+    int usage = _usedCapacity.get(resourceType).get(quotaType);
 
     // Fail with insufficient quota
     if (capacity <= usage) {
-      return new TaskAssignResult(task, this, false, 0,
-          TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA, String
-          .format("Insufficient quota %s::%s. Capacity: %s, Current Usage: %s", resourceType,
+      return new TaskAssignResult(task, quotaType, this, false, 0,
+          TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA,
+          String.format("Insufficient quota %s::%s. Capacity: %s, Current Usage: %s", resourceType,
               quotaType, capacity, usage));
     }
 
     // More remaining capacity leads to higher fitness score
-    int fitness = Math.round((float)(capacity - usage) / capacity * fitnessScoreFactor);
+    int fitness = Math.round((float) (capacity - usage) / capacity * fitnessScoreFactor);
 
-    return new TaskAssignResult(task, this, true, fitness,
-        null, "");
+    return new TaskAssignResult(task, quotaType, this, true, fitness, null, "");
   }
 
   /**
    * Performs the following to accept a task:
    * 1. Deduct the amount of resource required by this task
    * 2. Add this TaskAssignResult to _currentAssignments
-   * @param result
+   * @param result TaskAssignResult
    * @throws IllegalStateException if TaskAssignResult is not successful or the task is double
-   *                              assigned, or the task is not assigned to this instance
+   *           assigned, or the task is not assigned to this instance
    */
   public void assign(TaskAssignResult result) throws IllegalStateException {
     if (!result.isSuccessful()) {
@@ -327,13 +326,13 @@ public class AssignableInstance {
     // update resource usage
     // TODO (harry): get requested resource type from task config
     String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
-    String quotaType = result.getTaskConfig().getQuotaType();
+    String quotaType = result.getQuotaType();
 
     // Resource type / quota type might have already changed, i.e. we are recovering
     // current assignments for a live instance, but currently running tasks's quota
     // type has already been removed by user. So we do the deduction with best effort
-    if (_usedCapacity.containsKey(resourceType) && _usedCapacity.get(resourceType)
-        .containsKey(quotaType)) {
+    if (_usedCapacity.containsKey(resourceType)
+        && _usedCapacity.get(resourceType).containsKey(quotaType)) {
       int curUsage = _usedCapacity.get(resourceType).get(quotaType);
       _usedCapacity.get(resourceType).put(quotaType, curUsage + 1);
     } else {
@@ -351,20 +350,20 @@ public class AssignableInstance {
    * 1. Release the resource by adding back what the task required.
    * 2. Remove the TaskAssignResult from _currentAssignments
    * @param taskConfig config of this task
+   * @param quotaType quota type this task belongs to
    */
-  public void release(TaskConfig taskConfig) {
+  public void release(TaskConfig taskConfig, String quotaType) {
     if (!_currentAssignments.contains(taskConfig.getId())) {
       logger.warn("Task {} is not assigned on instance {}", taskConfig.getId(),
           _instanceConfig.getInstanceName());
       return;
     }
-    String quotaType = taskConfig.getQuotaType();
     String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
 
     // We might be releasing a task whose resource requirement / quota type is out-dated,
     // thus we need to check to avoid NPE
-    if (_usedCapacity.containsKey(resourceType) && _usedCapacity.get(resourceType)
-        .containsKey(quotaType)) {
+    if (_usedCapacity.containsKey(resourceType)
+        && _usedCapacity.get(resourceType).containsKey(quotaType)) {
       int curUsage = _usedCapacity.get(resourceType).get(quotaType);
       _usedCapacity.get(resourceType).put(quotaType, curUsage - 1);
     }
@@ -383,15 +382,16 @@ public class AssignableInstance {
    * @return TaskAssignResult with isSuccessful = true if successful. If assigning it to an instance
    *         fails, TaskAssignResult's getSuccessful() will return false
    */
-  public TaskAssignResult restoreTaskAssignResult(String taskId, TaskConfig taskConfig) {
-    TaskAssignResult assignResult = new TaskAssignResult(taskConfig, this, true, fitnessScoreFactor,
-        null, "Recovered TaskAssignResult from current state");
+  public TaskAssignResult restoreTaskAssignResult(String taskId, TaskConfig taskConfig,
+      String quotaType) {
+    TaskAssignResult assignResult = new TaskAssignResult(taskConfig, quotaType, this, true,
+        fitnessScoreFactor, null, "Recovered TaskAssignResult from current state");
     try {
       assign(assignResult);
     } catch (IllegalStateException e) {
-      logger.error("Failed to set current assignment for task {}.", taskId, e);
-      return new TaskAssignResult(taskConfig, this, false, fitnessScoreFactor,
-          null, "Recovered TaskAssignResult from current state");
+      logger.error("Failed to restore current TaskAssignResult for task {}.", taskId, e);
+      return new TaskAssignResult(taskConfig, quotaType, this, false, fitnessScoreFactor, null,
+          "Recovered TaskAssignResult from current state");
     }
     return assignResult;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
index f81749c..05622e8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
@@ -46,15 +46,17 @@ public class TaskAssignResult implements Comparable<TaskAssignResult> {
   private final AssignableInstance _node;
   private final TaskConfig _taskConfig;
   private final String _description;
+  private final String _quotaType;
 
-  public TaskAssignResult(TaskConfig taskConfig, AssignableInstance node, boolean isSuccessful,
-      int fitness, FailureReason reason, String description) {
+  public TaskAssignResult(TaskConfig taskConfig, String quotaType, AssignableInstance node,
+      boolean isSuccessful, int fitness, FailureReason reason, String description) {
     _isAssignmentSuccessful = isSuccessful;
     _fitnessScore = fitness;
     _reason = reason;
     _taskConfig = taskConfig;
     _node = node;
     _description = description;
+    _quotaType = quotaType;
   }
 
   /**
@@ -73,6 +75,15 @@ public class TaskAssignResult implements Comparable<TaskAssignResult> {
   }
 
   /**
+   * Returns the quota type of the underlying task being assigned. This will be used at release time
+   * so that the right quota type will see resources being released.
+   * @return quota type of the task
+   */
+  public String getQuotaType() {
+    return _quotaType;
+  }
+
+  /**
    * Returns the name of the instance this task was assigned to.
    * @return instance name. Null if assignment was not successful
    */
@@ -98,8 +109,7 @@ public class TaskAssignResult implements Comparable<TaskAssignResult> {
 
   /**
    * Returns a one sentence description that carries detail information about
-   * assignment failure for debug purpose
-   *
+   * assignment failure for debug purpose.
    * @return description
    */
   public String getFailureDescription() {
@@ -134,4 +144,4 @@ public class TaskAssignResult implements Comparable<TaskAssignResult> {
     sb.append("}");
     return sb.toString();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
index 79fbd64..b97ea89 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
@@ -27,11 +27,24 @@ public interface TaskAssigner {
   /**
    * Assign a collection of tasks on a collection of assignableInstances.
    * When an assignment decision is made, AssignableInstance.assign() must be called for the
-   * instance to modify its internal capacity profile.
-   * @param assignableInstances String -> AssignableInstanceMapping
-   * @param tasks String -> TaskConfig
-   * @return taskID -> TaskAssignmentResult mapping per task
+   * instance to modify its internal capacity profile. Note that all tasks will be treated as
+   * belonging to the DEFAULT type.
+   * @param assignableInstances AssignableInstances
+   * @param tasks TaskConfigs of the same quota type
+   * @return taskID -> TaskAssignmentResult mappings
    */
   Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances,
       Iterable<TaskConfig> tasks);
-}
+
+  /**
+   * Assign a collection of tasks on a collection of assignableInstances.
+   * When an assignment decision is made, AssignableInstance.assign() must be called for the
+   * instance to modify its internal capacity profile.
+   * @param assignableInstances AssignableInstances
+   * @param tasks TaskConfigs of the same quota type
+   * @param quotaType quota type of the tasks
+   * @return taskID -> TaskAssignmentResult mappings
+   */
+  Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances,
+      Iterable<TaskConfig> tasks, String quotaType);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
index ece7290..fee54e5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/ThreadCountBasedTaskAssigner.java
@@ -24,45 +24,53 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.PriorityQueue;
-import java.util.Random;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.task.TaskConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ThreadCountBasedTaskAssigner implements TaskAssigner {
-  private static final Logger logger =
-      LoggerFactory.getLogger(ThreadCountBasedTaskAssigner.class);
+  private static final Logger logger = LoggerFactory.getLogger(ThreadCountBasedTaskAssigner.class);
 
   private static final int SCHED_QUEUE_INIT_CAPACITY = 200;
+  private static final String DEFAULT_QUOTA_TYPE = "DEFAULT";
+
+  /**
+   * Assigns given tasks to given AssignableInstances assuming the DEFAULT quota type for all tasks.
+   * @param assignableInstances AssignableInstances
+   * @param tasks TaskConfigs of the same quota type
+   * @return taskID -> TaskAssignmentResult mappings
+   */
+  public Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances,
+      Iterable<TaskConfig> tasks) {
+    return assignTasks(assignableInstances, tasks, DEFAULT_QUOTA_TYPE);
+  }
 
   /**
    * This is a simple task assigning algorithm that uses the following assumptions to achieve
    * efficiency in assigning tasks:
-   *    1. All tasks have same quota type
-   *    2. All tasks only need 1 thread for assignment, no other things to consider
-   *
+   * 1. All tasks have same quota type
+   * 2. All tasks only need 1 thread for assignment, no other things to consider
    * The algorithm ensures the spread-out of tasks with same quota type or tasks from same job, with
    * best effort.
    * NOTE: once we have more things to consider during scheduling, we will need to come up with
-   * a more generic task assignment algorithm
-   * @param assignableInstances String -> AssignableInstanceMapping
-   * @param tasks String -> TaskConfig
-   * @return taskID -> TaskAssignmentResult mapping per task
+   * a more generic task assignment algorithm.
+   * @param assignableInstances AssignableInstances
+   * @param tasks TaskConfigs of the same quota type
+   * @param quotaType quota type of the tasks
+   * @return taskID -> TaskAssignmentResult mappings
    */
+  @Override
   public Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances,
-      Iterable<TaskConfig> tasks) {
+      Iterable<TaskConfig> tasks, String quotaType) {
     if (tasks == null || !tasks.iterator().hasNext()) {
       logger.warn("No task to assign!");
       return Collections.emptyMap();
     }
     if (assignableInstances == null || !assignableInstances.iterator().hasNext()) {
       logger.warn("No instance to assign!");
-      return buildNoInstanceAssignment(tasks);
+      return buildNoInstanceAssignment(tasks, quotaType);
     }
-
-    // get quota type
-    String quotaType = tasks.iterator().next().getQuotaType();
     logger.info("Assigning tasks with quota type {}", quotaType);
 
     // Build a sched queue
@@ -83,17 +91,17 @@ public class ThreadCountBasedTaskAssigner implements TaskAssigner {
       // we assume all subsequent tasks will fail with same reason
       if (lastFailure != null) {
         assignResults.put(task.getId(),
-            new TaskAssignResult(task, null, false, lastFailure.getFitnessScore(),
+            new TaskAssignResult(task, quotaType, null, false, lastFailure.getFitnessScore(),
                 lastFailure.getFailureReason(), lastFailure.getFailureDescription()));
         continue;
       }
 
       // Try to assign the task to least used instance
       AssignableInstance instance = queue.poll();
-      TaskAssignResult result = instance.tryAssign(task);
+      TaskAssignResult result = instance.tryAssign(task, quotaType);
       assignResults.put(task.getId(), result);
 
-      if (!result.isSuccessful()){
+      if (!result.isSuccessful()) {
         // For all failure reasons other than duplicated assignment, we can fail
         // subsequent tasks
         lastFailure = result;
@@ -120,10 +128,11 @@ public class ThreadCountBasedTaskAssigner implements TaskAssigner {
     return queue;
   }
 
-  private Map<String, TaskAssignResult> buildNoInstanceAssignment(Iterable<TaskConfig> tasks) {
+  private Map<String, TaskAssignResult> buildNoInstanceAssignment(Iterable<TaskConfig> tasks,
+      String quotaType) {
     Map<String, TaskAssignResult> result = new HashMap<>();
     for (TaskConfig taskConfig : tasks) {
-      result.put(taskConfig.getId(), new TaskAssignResult(taskConfig, null, false, 0,
+      result.put(taskConfig.getId(), new TaskAssignResult(taskConfig, quotaType, null, false, 0,
           TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA, "No assignable instance to assign"));
     }
     return result;
@@ -149,7 +158,6 @@ public class ThreadCountBasedTaskAssigner implements TaskAssigner {
      * Using this comparator, AssignableInstance will be sorted based on availability of
      * quota given job type in the priority queue. Top of the queue will be the one with
      * highest priority
-     *
      * @return a negative integer, zero, or a positive integer as the
      *         first argument is less than, equal to, or greater than the
      *         second
@@ -163,12 +171,12 @@ public class ThreadCountBasedTaskAssigner implements TaskAssigner {
 
     private Integer getRemainingUsage(Map<String, Map<String, Integer>> capacity,
         Map<String, Map<String, Integer>> used) {
-      if (capacity.containsKey(RESOURCE_TYPE) && capacity.get(RESOURCE_TYPE)
-          .containsKey(_quotaType)) {
-        return capacity.get(RESOURCE_TYPE).get(_quotaType) - used.get(RESOURCE_TYPE)
-            .get(_quotaType);
+      if (capacity.containsKey(RESOURCE_TYPE)
+          && capacity.get(RESOURCE_TYPE).containsKey(_quotaType)) {
+        return capacity.get(RESOURCE_TYPE).get(_quotaType)
+            - used.get(RESOURCE_TYPE).get(_quotaType);
       }
       return 0;
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java
index c17f4eb..b5d5e44 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManager.java
@@ -36,7 +36,6 @@ import org.apache.helix.task.assigner.TaskAssignResult;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import sun.security.jca.GetInstance;
 
 public class TestAssignableInstanceManager {
   private static final int NUM_PARTICIPANTS = 3;

http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java
index b05e049..f971820 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestAssignableInstanceManagerControllerSwitch.java
@@ -71,7 +71,7 @@ public class TestAssignableInstanceManagerControllerSwitch extends TaskTestBase
         accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs(), true);
 
     // Wait for the job pipeline
-    Thread.sleep(2000);
+    Thread.sleep(100);
     taskDataCache.refresh(accessor, resourceConfigMap);
 
     // Create prev manager and build
@@ -86,7 +86,7 @@ public class TestAssignableInstanceManagerControllerSwitch extends TaskTestBase
     // Stop the current controller
     _controller.syncStop();
     // Start a new controller
-    String newControllerName = CONTROLLER_PREFIX + "_1";
+    String newControllerName = CONTROLLER_PREFIX + "_2";
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, newControllerName);
     _controller.syncStart();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java b/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java
index 4030df7..5049df7 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/AssignerTestBase.java
@@ -41,7 +41,8 @@ import org.apache.helix.task.TaskConfig;
     return createLiveInstance(resourceTypes, resourceCapacity, testInstanceName);
   }
 
-  /* package */ LiveInstance createLiveInstance(String[] resourceTypes, String[] resourceCapacity, String instancename) {
+  /* package */ LiveInstance createLiveInstance(String[] resourceTypes, String[] resourceCapacity,
+      String instancename) {
     LiveInstance li = new LiveInstance(instancename);
     if (resourceCapacity != null && resourceTypes != null) {
       Map<String, String> resMap = new HashMap<>();
@@ -62,8 +63,8 @@ import org.apache.helix.task.TaskConfig;
       }
     }
     if (addDefaultQuota) {
-      clusterConfig.setTaskQuotaRatio(TaskConfig.DEFAULT_QUOTA_TYPE, defaultQuotaRatio);
+      clusterConfig.setTaskQuotaRatio(AssignableInstance.DEFAULT_QUOTA_TYPE, defaultQuotaRatio);
     }
     return clusterConfig;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index a1a5852..0f12aea 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -30,7 +30,6 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
-
 public class TestAssignableInstance extends AssignerTestBase {
 
   @Test
@@ -56,64 +55,55 @@ public class TestAssignableInstance extends AssignerTestBase {
   @Test
   public void testInitializationWithQuotaUnset() {
     // Initialize AssignableInstance with neither resource capacity nor quota ratio provided
-    AssignableInstance ai = new AssignableInstance(
-        createClusterConfig(null, null, false),
-        new InstanceConfig(testInstanceName),
-        createLiveInstance(null, null)
-    );
+    AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, false),
+        new InstanceConfig(testInstanceName), createLiveInstance(null, null));
     Assert.assertEquals(ai.getUsedCapacity().size(), 1);
     Assert.assertEquals(
         (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
-            .get(TaskConfig.DEFAULT_QUOTA_TYPE), 0);
+            .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
+        0);
     Assert.assertEquals(
         (int) ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
-            .get(TaskConfig.DEFAULT_QUOTA_TYPE), TaskStateModelFactory.TASK_THREADPOOL_SIZE);
+            .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
+        TaskStateModelFactory.TASK_THREADPOOL_SIZE);
     Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
   }
 
   @Test
   public void testInitializationWithOnlyCapacity() {
     // Initialize AssignableInstance with only resource capacity provided
-    AssignableInstance ai = new AssignableInstance(
-        createClusterConfig(null, null, false),
+    AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, false),
         new InstanceConfig(testInstanceName),
-        createLiveInstance(testResourceTypes, testResourceCapacity)
-    );
+        createLiveInstance(testResourceTypes, testResourceCapacity));
     Assert.assertEquals(ai.getTotalCapacity().size(), testResourceTypes.length);
     Assert.assertEquals(ai.getUsedCapacity().size(), testResourceTypes.length);
     for (int i = 0; i < testResourceTypes.length; i++) {
       Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i]).size(), 1);
       Assert.assertEquals(ai.getUsedCapacity().get(testResourceTypes[i]).size(), 1);
       Assert.assertEquals(
-          ai.getTotalCapacity().get(testResourceTypes[i]).get(TaskConfig.DEFAULT_QUOTA_TYPE),
-          Integer.valueOf(testResourceCapacity[i])
-      );
+          ai.getTotalCapacity().get(testResourceTypes[i]).get(AssignableInstance.DEFAULT_QUOTA_TYPE),
+          Integer.valueOf(testResourceCapacity[i]));
       Assert.assertEquals(
-          ai.getUsedCapacity().get(testResourceTypes[i]).get(TaskConfig.DEFAULT_QUOTA_TYPE),
-          Integer.valueOf(0)
-      );
+          ai.getUsedCapacity().get(testResourceTypes[i]).get(AssignableInstance.DEFAULT_QUOTA_TYPE),
+          Integer.valueOf(0));
     }
   }
 
   @Test
   public void testInitializationWithOnlyQuotaType() {
     // Initialize AssignableInstance with only quota type provided
-    AssignableInstance ai = new AssignableInstance(
-        createClusterConfig(testQuotaTypes, testQuotaRatio, false),
-        new InstanceConfig(testInstanceName),
-        createLiveInstance(null, null)
-    );
+    AssignableInstance ai =
+        new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
+            new InstanceConfig(testInstanceName), createLiveInstance(null, null));
 
     Assert.assertEquals(ai.getTotalCapacity().size(), 1);
     Assert.assertEquals(ai.getUsedCapacity().size(), 1);
     Assert.assertEquals(
         ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).size(),
-        testQuotaTypes.length
-    );
+        testQuotaTypes.length);
     Assert.assertEquals(
         ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).size(),
-        testQuotaTypes.length
-    );
+        testQuotaTypes.length);
     Assert.assertEquals(
         ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()),
         calculateExpectedQuotaPerType(TaskStateModelFactory.TASK_THREADPOOL_SIZE, testQuotaTypes,
@@ -124,14 +114,15 @@ public class TestAssignableInstance extends AssignerTestBase {
   @Test
   public void testInitializationWithQuotaAndCapacity() {
     // Initialize AssignableInstance with both capacity and quota type provided
-    AssignableInstance ai = new AssignableInstance(
-        createClusterConfig(testQuotaTypes, testQuotaRatio, false),
-        new InstanceConfig(testInstanceName),
-        createLiveInstance(testResourceTypes, testResourceCapacity)
-    );
+    AssignableInstance ai =
+        new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
+            new InstanceConfig(testInstanceName),
+            createLiveInstance(testResourceTypes, testResourceCapacity));
 
     Map<String, Integer> usedResourcePerType =
-        createResourceQuotaPerTypeMap(testQuotaTypes, new int[] { 0, 0, 0 });
+        createResourceQuotaPerTypeMap(testQuotaTypes, new int[] {
+            0, 0, 0
+        });
     for (int i = 0; i < testResourceTypes.length; i++) {
       Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i]),
           calculateExpectedQuotaPerType(Integer.valueOf(testResourceCapacity[i]), testQuotaTypes,
@@ -142,17 +133,24 @@ public class TestAssignableInstance extends AssignerTestBase {
 
   @Test
   public void testAssignableInstanceUpdateConfigs() {
-    AssignableInstance ai = new AssignableInstance(
-        createClusterConfig(testQuotaTypes, testQuotaRatio, false),
-        new InstanceConfig(testInstanceName),
-        createLiveInstance(testResourceTypes, testResourceCapacity)
-    );
+    AssignableInstance ai =
+        new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
+            new InstanceConfig(testInstanceName),
+            createLiveInstance(testResourceTypes, testResourceCapacity));
 
-    String[] newResources = new String[] {"Resource2", "Resource3", "Resource4"};
-    String[] newResourceCapacities = new String[] {"100", "150", "50"};
+    String[] newResources = new String[] {
+        "Resource2", "Resource3", "Resource4"
+    };
+    String[] newResourceCapacities = new String[] {
+        "100", "150", "50"
+    };
 
-    String[] newTypes = new String[] {"Type3", "Type4", "Type5", "Type6"};
-    String[] newTypeRatio = new String[] {"20", "40", "25", "25"};
+    String[] newTypes = new String[] {
+        "Type3", "Type4", "Type5", "Type6"
+    };
+    String[] newTypeRatio = new String[] {
+        "20", "40", "25", "25"
+    };
 
     LiveInstance newLiveInstance = createLiveInstance(newResources, newResourceCapacities);
     ClusterConfig newClusterConfig = createClusterConfig(newTypes, newTypeRatio, false);
@@ -162,28 +160,26 @@ public class TestAssignableInstance extends AssignerTestBase {
     Assert.assertEquals(ai.getTotalCapacity().size(), newResourceCapacities.length);
 
     for (int i = 0; i < newResources.length; i++) {
-      Assert.assertEquals(ai.getTotalCapacity().get(newResources[i]),
-          calculateExpectedQuotaPerType(Integer.valueOf(newResourceCapacities[i]), newTypes,
-              newTypeRatio));
+      Assert.assertEquals(ai.getTotalCapacity().get(newResources[i]), calculateExpectedQuotaPerType(
+          Integer.valueOf(newResourceCapacities[i]), newTypes, newTypeRatio));
       Assert.assertEquals(ai.getUsedCapacity().get(newResources[i]),
-          createResourceQuotaPerTypeMap(newTypes, new int[] { 0, 0, 0, 0 }));
+          createResourceQuotaPerTypeMap(newTypes, new int[] {
+              0, 0, 0, 0
+          }));
     }
   }
 
   @Test
   public void testNormalTryAssign() {
-    AssignableInstance ai = new AssignableInstance(
-        createClusterConfig(null, null, true),
-        new InstanceConfig(testInstanceName),
-        createLiveInstance(null, null)
-    );
+    AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, true),
+        new InstanceConfig(testInstanceName), createLiveInstance(null, null));
 
-    // When nothing is configured, we should use default quota to assign
+    // When nothing is configured, we should use default quota type to assign
     Map<String, TaskAssignResult> results = new HashMap<>();
     for (int i = 0; i < TaskStateModelFactory.TASK_THREADPOOL_SIZE; i++) {
       String taskId = Integer.toString(i);
       TaskConfig task = new TaskConfig("", null, taskId, null);
-      TaskAssignResult result = ai.tryAssign(task);
+      TaskAssignResult result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
       Assert.assertTrue(result.isSuccessful());
       ai.assign(result);
       results.put(taskId, result);
@@ -192,9 +188,10 @@ public class TestAssignableInstance extends AssignerTestBase {
     // We are out of quota now and we should not be able to assign
     String taskId = "TaskCannotAssign";
     TaskConfig task = new TaskConfig("", null, taskId, null);
-    TaskAssignResult result = ai.tryAssign(task);
+    TaskAssignResult result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
     Assert.assertFalse(result.isSuccessful());
-    Assert.assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
+    Assert.assertEquals(result.getFailureReason(),
+        TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
     try {
       ai.assign(result);
       Assert.fail("Expecting IllegalStateException");
@@ -203,18 +200,19 @@ public class TestAssignableInstance extends AssignerTestBase {
     }
 
     // After releasing 1 task, we should be able to schedule
-    ai.release(results.get("1").getTaskConfig());
-    result = ai.tryAssign(task);
+    ai.release(results.get("1").getTaskConfig(), AssignableInstance.DEFAULT_QUOTA_TYPE);
+    result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
     Assert.assertTrue(result.isSuccessful());
 
     // release all tasks, check remaining resources
     for (TaskAssignResult rst : results.values()) {
-      ai.release(rst.getTaskConfig());
+      ai.release(rst.getTaskConfig(), AssignableInstance.DEFAULT_QUOTA_TYPE);
     }
 
     Assert.assertEquals(
         (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
-            .get(TaskConfig.DEFAULT_QUOTA_TYPE), 0);
+            .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
+        0);
   }
 
   @Test
@@ -227,25 +225,27 @@ public class TestAssignableInstance extends AssignerTestBase {
     // No such resource type
     String taskId = "testTask";
     TaskConfig task = new TaskConfig("", null, taskId, "");
-    TaskAssignResult result = ai.tryAssign(task);
+    TaskAssignResult result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
     Assert.assertFalse(result.isSuccessful());
     Assert.assertEquals(result.getFailureReason(),
         TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE);
 
     // No such quota type
-    ai.updateConfigs(null, null, createLiveInstance(
-        new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() },
-        new String[] { "1" }));
+    ai.updateConfigs(null, null, createLiveInstance(new String[] {
+        LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()
+    }, new String[] {
+        "1"
+    }));
 
-    result = ai.tryAssign(task);
+    result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
     Assert.assertFalse(result.isSuccessful());
-    Assert
-        .assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE);
+    Assert.assertEquals(result.getFailureReason(),
+        TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE);
 
     ai.updateConfigs(createClusterConfig(testQuotaTypes, testQuotaRatio, true), null, null);
 
-    task.setQuotaType(TaskConfig.DEFAULT_QUOTA_TYPE);
-    result = ai.tryAssign(task);
+    result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
+
     Assert.assertTrue(result.isSuccessful());
     ai.assign(result);
     try {
@@ -256,42 +256,50 @@ public class TestAssignableInstance extends AssignerTestBase {
     }
 
     // Duplicate assignment
-    result = ai.tryAssign(task);
+    result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
     Assert.assertFalse(result.isSuccessful());
-    Assert.assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED);
+    Assert.assertEquals(result.getFailureReason(),
+        TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED);
 
     // Insufficient quota
-    ai.release(task);
-    ai.updateConfigs(null, null, createLiveInstance(
-        new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() },
-        new String[] { "0" }));
+    ai.release(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
+    ai.updateConfigs(null, null, createLiveInstance(new String[] {
+        LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()
+    }, new String[] {
+        "0"
+    }));
+
+    result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
 
-    task.setQuotaType(TaskConfig.DEFAULT_QUOTA_TYPE);
-    result = ai.tryAssign(task);
     Assert.assertFalse(result.isSuccessful());
-    Assert
-        .assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
+    Assert.assertEquals(result.getFailureReason(),
+        TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
   }
 
   @Test
-  public void testSetCurrentAssignment() {
+  public void testRestoreTaskAssignResult() {
     AssignableInstance ai =
         new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, true),
             new InstanceConfig(testInstanceName), createLiveInstance(
-            new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() },
+            new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()},
             new String[] { "40" }));
 
     Map<String, TaskConfig> currentAssignments = new HashMap<>();
-    currentAssignments.put("supportedTask", new TaskConfig("", null, "supportedTask", ""));
+    TaskConfig supportedTask = new TaskConfig("", null, "supportedTask", "");
+    currentAssignments.put("supportedTask", supportedTask);
+
     TaskConfig unsupportedTask = new TaskConfig("", null, "unsupportedTask", "");
-    unsupportedTask.setQuotaType("UnsupportedQuotaType");
     currentAssignments.put("unsupportedTask", unsupportedTask);
 
     Map<String, TaskAssignResult> results = Maps.newHashMap();
     for (Map.Entry<String, TaskConfig> entry : currentAssignments.entrySet()) {
       String taskID = entry.getKey();
       TaskConfig taskConfig = entry.getValue();
-      TaskAssignResult taskAssignResult = ai.restoreTaskAssignResult(taskID, taskConfig);
+      String quotaType = (taskID.equals("supportedTask")) ? AssignableInstance.DEFAULT_QUOTA_TYPE
+          : "UnsupportedQuotaType";
+      // Restore TaskAssignResult
+      TaskAssignResult taskAssignResult =
+          ai.restoreTaskAssignResult(taskID, taskConfig, quotaType);
       if (taskAssignResult.isSuccessful()) {
         results.put(taskID, taskAssignResult);
       }
@@ -304,7 +312,8 @@ public class TestAssignableInstance extends AssignerTestBase {
     Assert.assertEquals(ai.getCurrentAssignments().size(), 2);
     Assert.assertEquals(
         (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
-            .get(TaskConfig.DEFAULT_QUOTA_TYPE), 1);
+            .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
+        1);
   }
 
   private Map<String, Integer> createResourceQuotaPerTypeMap(String[] types, int[] quotas) {
@@ -326,8 +335,7 @@ public class TestAssignableInstance extends AssignerTestBase {
 
     for (int i = 0; i < quotaRatios.length; i++) {
       expectedQuotaPerType.put(quotaTypes[i],
-          Math.round((float)capacity * Integer.valueOf(quotaRatios[i])
-              / totalQuota));
+          Math.round((float) capacity * Integer.valueOf(quotaRatios[i]) / totalQuota));
     }
     return expectedQuotaPerType;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/304a8f86/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java
index ec8753c..b57b4e2 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestThreadCountBasedTaskAssigner.java
@@ -42,10 +42,10 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase {
 
     for (String quotaType : testQuotaTypes) {
       // Create tasks
-      List<TaskConfig> tasks = createTaskConfigs(taskCountPerType, quotaType);
+      List<TaskConfig> tasks = createTaskConfigs(taskCountPerType);
 
       // Assign
-      Map<String, TaskAssignResult> results = assigner.assignTasks(instances, tasks);
+      Map<String, TaskAssignResult> results = assigner.assignTasks(instances, tasks, quotaType);
 
       // Check success
       assertAssignmentResults(results.values(), true);
@@ -64,9 +64,9 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase {
   public void testAssignmentFailureNoInstance() {
     TaskAssigner assigner = new ThreadCountBasedTaskAssigner();
     int taskCount = 10;
-    List<TaskConfig> tasks = createTaskConfigs(taskCount, "Dummy");
+    List<TaskConfig> tasks = createTaskConfigs(taskCount);
     Map<String, TaskAssignResult> results =
-        assigner.assignTasks(Collections.<AssignableInstance>emptyList(), tasks);
+        assigner.assignTasks(Collections.<AssignableInstance>emptyList(), tasks, "Dummy");
     Assert.assertEquals(results.size(), taskCount);
     for (TaskAssignResult result : results.values()) {
       Assert.assertFalse(result.isSuccessful());
@@ -91,9 +91,9 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase {
 
     // 10 * Type1 quota
     List<AssignableInstance> instances = createAssignableInstances(2, 10);
-    List<TaskConfig> tasks = createTaskConfigs(20, testQuotaTypes[0]);
+    List<TaskConfig> tasks = createTaskConfigs(20);
 
-    Map<String, TaskAssignResult> results = assigner.assignTasks(instances, tasks);
+    Map<String, TaskAssignResult> results = assigner.assignTasks(instances, tasks, testQuotaTypes[0]);
     int successCnt = 0;
     int failCnt = 0;
     for (TaskAssignResult rst : results.values()) {
@@ -113,13 +113,13 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase {
   public void testAssignmentFailureDuplicatedTask() {
     TaskAssigner assigner = new ThreadCountBasedTaskAssigner();
     List<AssignableInstance> instances = createAssignableInstances(1, 20);
-    List<TaskConfig> tasks = createTaskConfigs(10, testQuotaTypes[0], false);
+    List<TaskConfig> tasks = createTaskConfigs(10, false);
 
     // Duplicate all tasks
-    tasks.addAll(createTaskConfigs(10, testQuotaTypes[0], false));
+    tasks.addAll(createTaskConfigs(10, false));
     Collections.shuffle(tasks);
 
-    Map<String, TaskAssignResult> results = assigner.assignTasks(instances, tasks);
+    Map<String, TaskAssignResult> results = assigner.assignTasks(instances, tasks, testQuotaTypes[0]);
     Assert.assertEquals(results.size(), 10);
     assertAssignmentResults(results.values(), true);
   }
@@ -142,14 +142,14 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase {
 
       // 50 * instanceCount number of tasks
       List<AssignableInstance> instances = createAssignableInstances(instanceCount, 100);
-      List<TaskConfig> tasks = createTaskConfigs(taskCount, testQuotaTypes[0]);
+      List<TaskConfig> tasks = createTaskConfigs(taskCount);
       List<Map<String, TaskAssignResult>> allResults = new ArrayList<>();
 
       // Assign
       long start = System.currentTimeMillis();
       for (int j = 0; j < taskCount / assignBatchSize; j++) {
         allResults.add(assigner
-            .assignTasks(instances, tasks.subList(j * assignBatchSize, (j + 1) * assignBatchSize)));
+            .assignTasks(instances, tasks.subList(j * assignBatchSize, (j + 1) * assignBatchSize), testQuotaTypes[0]));
       }
       long duration = System.currentTimeMillis() - start;
       totalTime += duration;
@@ -170,16 +170,15 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase {
     }
   }
 
-  private List<TaskConfig> createTaskConfigs(int count, String quotaType) {
-    return createTaskConfigs(count, quotaType, true);
+  private List<TaskConfig> createTaskConfigs(int count) {
+    return createTaskConfigs(count, true);
   }
 
-  private List<TaskConfig> createTaskConfigs(int count, String quotaType, boolean randomID) {
+  private List<TaskConfig> createTaskConfigs(int count, boolean randomID) {
     List<TaskConfig> tasks = new ArrayList<>();
     for (int i = 0; i < count; i++) {
       TaskConfig task =
           new TaskConfig(null, null, randomID ? UUID.randomUUID().toString() : "task-" + i, null);
-      task.setQuotaType(quotaType);
       tasks.add(task);
     }
     return tasks;
@@ -203,4 +202,4 @@ public class TestThreadCountBasedTaskAssigner extends AssignerTestBase {
     }
     return instances;
   }
-}
+}
\ No newline at end of file