You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/16 18:22:24 UTC

[4/4] helix git commit: [HELIX-730] Add ThreadCountBasedAssignmentCalculator and integrate with Workflow/JobRebalancer and fix rebalancing logic

[HELIX-730] Add ThreadCountBasedAssignmentCalculator and integrate with Workflow/JobRebalancer and fix rebalancing logic

For quota-based scheduling of tasks, we have added the TaskAssigner interface that takes into account AssignableInstances by way of AssignableInstanceManager. In order to use this in the currently-existing pipeline prior to Task Framework 2.0, GenericTaskAssignmentCalculator was replaced with ThreadCountBasedAssignmentCalculator, which is a wrapper around TaskAssigner. Necessary adjustments were made in Workflow/JobRebalancer for this replacement. Also the rebalance logic in Workflow/JobRebalancer was reviewed and fixed. Additionally, TestQuotaBasedScheduling is added to test quota-based task scheduling. Note that quotas will apply to both generic and targeted jobs.

A few bugs were uncovered during this process such as the faulty retry logic that never really got tasks to restart. For more details, see the changelist below:

Changelist:
    1. Add ThreadCountBasedAssignmentCalculator, a wrapper around ThreadCountBasedTaskAssigner
    2. Make logic changes in JobRebalancer to enable the use of ThreadCountBasedAssignmentCalculator
    3. Fix the failing test by using a thread-safe map and rename TestGenericTaskAssignmentCalculator to TestTaskAssignmentCalculator to better reflect what its tests are doing
    4. Add retry logic that was previously absent for INIT and DROPPED tasks in JobRebalancer
    5. Add TestQuotaBasedScheduling to test that jobs and tasks were being assigned and scheduled per quota config set in ClusterConfig
    6. Add more log messages to aid with task-scheduling debugging in AssignableInstance
    7. In AbstractTaskDispatcher, for tasks that are STOPPED, TIMED_OUT, TASK_ERROR, the retry logic was newly implemented so that they get re-started correctly
    8. In AbstractTaskDispatcher, when enforcing overlapAssign for jobs with isAllowOverlapAssignment(), a fix was implemented so that only jobs whose state is IN_PROGRESS are considered
    9. In AbstractTaskDispatcher, isWorkflowFinished() method was modified so that non-active jobs will have their tasks' resource freed from AssignableInstances to prevent resource leak
   10. In markJobFailed() and markJobCompleted(), non-active jobs will have their tasks' resource freed from AssignableInstances to prevent resource leak
   11. Fix the logic so that quotas do not apply to targeted jobs
   12. Fix TestTaskRebalancer (assumes Consistent Hashing, which is no longer used)
   13. Fix TestIndependentTaskRebalancer (assumes Consistent Hashing, no longer used)
   14. Assignment logic was improved so that incomplete tasks whose assigned participants are no longer live will be re-assigned accordingly
   15. Fix TestTaskRebalanceFailover (tasks on non-live instances will be re-assigned promptly)
   16. Fix TestRebalanceRunningTask (targeted jobs will get tasks reassigned upon liveInstance and currentState change)
   17. Fix a bug in FixedAssignmentCalculator and assignment logic for targeted jobs such that a task index will no longer be assigned multiple times
   18. Fix TestJobFailureTaskNotStarted (tasks were not being assigned at all due to having reached maximum capacity for quota)
   19. Add targetedTaskConfigMap field in JobConfig to cache TaskConfig objects for targeted tasks to reduce object creation and GC overload
   20. Fix JobConfig so that it doesn't write quotaType to ZooKeeper when quotaType is null or not set
   21. Fix deleteWorkflow() in TaskUtil so that the earliest delete failure will render the entire method as failed (and return prematurely to prevent breaking other ZNodes from incomplete deletion)
   22. Fix TestDeleteWorkflow by adding another removeProperty() clause to lower failure rate


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4db61b56
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4db61b56
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4db61b56

Branch: refs/heads/master
Commit: 4db61b56e473b64ec9956f694dd2ac6a8d328ed4
Parents: 5e4e26c
Author: Hunter Lee <na...@gmail.com>
Authored: Fri Jul 13 14:45:41 2018 -0700
Committer: Hunter Lee <na...@gmail.com>
Committed: Fri Jul 13 17:38:58 2018 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     | 158 +++--
 .../org/apache/helix/model/ClusterConfig.java   |  75 ++-
 .../helix/task/AbstractTaskDispatcher.java      | 587 ++++++++++++-----
 .../helix/task/AssignableInstanceManager.java   |  16 +-
 .../FixedTargetTaskAssignmentCalculator.java    | 177 ++++-
 .../java/org/apache/helix/task/JobConfig.java   |  40 +-
 .../org/apache/helix/task/JobRebalancer.java    | 113 ++--
 .../helix/task/TaskAssignmentCalculator.java    |   4 +-
 .../org/apache/helix/task/TaskRebalancer.java   |  29 +-
 .../java/org/apache/helix/task/TaskUtil.java    | 282 ++++----
 ...hreadCountBasedTaskAssignmentCalculator.java | 160 +++++
 .../apache/helix/task/WorkflowRebalancer.java   |  85 ++-
 .../helix/task/assigner/AssignableInstance.java |  51 +-
 .../assigner/ThreadCountBasedTaskAssigner.java  |   9 +
 .../integration/TestBatchEnableInstances.java   |  12 +-
 .../TestStateTransitionCancellation.java        |   6 +-
 .../controller/TestClusterMaintenanceMode.java  |   4 +-
 .../controller/TestTargetExternalView.java      |   4 +-
 .../integration/manager/TestZkHelixAdmin.java   |  18 +-
 .../integration/task/TestDeleteWorkflow.java    |  13 +-
 .../TestGenericTaskAssignmentCalculator.java    | 200 ------
 .../task/TestIndependentTaskRebalancer.java     |  64 +-
 .../helix/integration/task/TestJobFailure.java  |   4 +-
 .../task/TestJobFailureHighThreshold.java       |   4 +-
 .../task/TestJobFailureTaskNotStarted.java      |  13 +-
 .../helix/integration/task/TestJobTimeout.java  |   4 +-
 .../task/TestJobTimeoutTaskNotStarted.java      |  27 +-
 .../task/TestQuotaBasedScheduling.java          | 654 +++++++++++++++++++
 .../task/TestRebalanceRunningTask.java          | 133 ++--
 .../integration/task/TestStopWorkflow.java      |   4 +-
 .../task/TestTaskAssignmentCalculator.java      | 241 +++++++
 .../integration/task/TestTaskRebalancer.java    |  13 +-
 .../task/TestTaskRebalancerFailover.java        |   6 +-
 .../task/TestTaskRebalancerRetryLimit.java      |   4 +-
 .../integration/task/TestTaskRetryDelay.java    |   5 +-
 .../task/TestWorkflowJobDependency.java         |   4 +-
 .../task/TestWorkflowTermination.java           |   4 +-
 .../integration/task/TestWorkflowTimeout.java   |   2 +-
 .../helix/task/TaskSynchronizedTestBase.java    |  13 +-
 .../helix/task/TestSemiAutoStateTransition.java |   6 +-
 40 files changed, 2317 insertions(+), 931 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index ca2fa76..1f32c04 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -96,17 +96,15 @@ public class ClusterDataCache {
 
   // maintain a cache of bestPossible assignment across pipeline runs
   // TODO: this is only for customRebalancer, remove it and merge it with _idealMappingCache.
-  private Map<String, ResourceAssignment>  _resourceAssignmentCache = new HashMap<>();
-
+  private Map<String, ResourceAssignment> _resourceAssignmentCache = new HashMap<>();
 
   // maintain a cache of idealmapping (preference list) for full-auto resource across pipeline runs
-  private Map<String, ZNRecord>  _idealMappingCache = new HashMap<>();
+  private Map<String, ZNRecord> _idealMappingCache = new HashMap<>();
 
   private Map<ChangeType, Boolean> _propertyDataChangedMap;
 
   private Map<String, Integer> _participantActiveTaskCount = new HashMap<>();
 
-
   private ExecutorService _asyncTasksThreadPool;
 
   boolean _updateInstanceOfflineTime = true;
@@ -115,7 +113,10 @@ public class ClusterDataCache {
 
   private String _clusterName;
 
-  public ClusterDataCache () {
+  // For detecting liveinstance and target resource partition state change in task assignment
+  private boolean _existsLiveInstanceOrCurrentStateChange = false;
+
+  public ClusterDataCache() {
     this(null);
   }
 
@@ -146,33 +147,30 @@ public class ClusterDataCache {
       clearCachedResourceAssignments();
       _idealStateCache.refresh(accessor);
       LogUtil.logInfo(LOG, _eventId,
-          "Refresh IdealStates for cluster " + _clusterName + ", took " + (
-              System.currentTimeMillis() - startTime) + " ms for " + (_isTaskCache
-              ? "TASK"
-              : "DEFAULT") + "pipeline");
+          "Refresh IdealStates for cluster " + _clusterName + ", took "
+              + (System.currentTimeMillis() - startTime) + " ms for "
+              + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
     }
 
     if (_propertyDataChangedMap.get(ChangeType.LIVE_INSTANCE)) {
+      _existsLiveInstanceOrCurrentStateChange = true;
       startTime = System.currentTimeMillis();
       _propertyDataChangedMap.put(ChangeType.LIVE_INSTANCE, false);
       clearCachedResourceAssignments();
       _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances(), true);
       _updateInstanceOfflineTime = true;
       LogUtil.logInfo(LOG, _eventId,
-          "Refresh LiveInstances for cluster " + _clusterName + ", took " + (
-              System.currentTimeMillis() - startTime) + " ms for " + (_isTaskCache
-              ? "TASK"
-              : "DEFAULT") + "pipeline");
+          "Refresh LiveInstances for cluster " + _clusterName + ", took "
+              + (System.currentTimeMillis() - startTime) + " ms for "
+              + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
     }
 
     if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) {
       _propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, false);
       clearCachedResourceAssignments();
       _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
-      LogUtil.logInfo(LOG, _eventId,
-          "Reload InstanceConfig: " + _instanceConfigCacheMap.keySet() + " for " + (_isTaskCache
-              ? "TASK"
-              : "DEFAULT") + "pipeline");
+      LogUtil.logInfo(LOG, _eventId, "Reload InstanceConfig: " + _instanceConfigCacheMap.keySet()
+          + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
     }
 
     if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) {
@@ -180,11 +178,15 @@ public class ClusterDataCache {
       clearCachedResourceAssignments();
       _resourceConfigCacheMap =
           accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs(), true);
-      LogUtil.logInfo(LOG, _eventId,
-          "Reload ResourceConfigs: " + _resourceConfigCacheMap.keySet() + " for " + (_isTaskCache
-              ? "TASK"
-              : "DEFAULT") + "pipeline");
+      LogUtil.logInfo(LOG, _eventId, "Reload ResourceConfigs: " + _resourceConfigCacheMap.keySet()
+          + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
+    }
 
+    // This is for target jobs' task assignment. It needs to watch for current state changes for
+    // when targeted resources' state transitions complete
+    if (_propertyDataChangedMap.get(ChangeType.CURRENT_STATE)) {
+      _existsLiveInstanceOrCurrentStateChange = true;
+      _propertyDataChangedMap.put(ChangeType.CURRENT_STATE, false);
     }
 
     _liveInstanceMap = new HashMap<>(_liveInstanceCacheMap);
@@ -212,7 +214,8 @@ public class ClusterDataCache {
 
     // current state must be refreshed before refreshing relay messages
     // because we need to use current state to validate all relay messages.
-    _instanceMessagesCache.updateRelayMessages(_liveInstanceMap, _currentStateCache.getCurrentStatesMap());
+    _instanceMessagesCache.updateRelayMessages(_liveInstanceMap,
+        _currentStateCache.getCurrentStatesMap());
 
     if (_clusterConfig != null) {
       _idealStateRuleMap = _clusterConfig.getIdealStateRules();
@@ -229,26 +232,25 @@ public class ClusterDataCache {
 
     long endTime = System.currentTimeMillis();
     LogUtil.logInfo(LOG, _eventId,
-        "END: ClusterDataCache.refresh() for cluster " + getClusterName() + ", took " + (endTime
-            - startTime) + " ms for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
+        "END: ClusterDataCache.refresh() for cluster " + getClusterName() + ", took "
+            + (endTime - startTime) + " ms for " + (_isTaskCache ? "TASK" : "DEFAULT")
+            + "pipeline");
 
     if (LOG.isDebugEnabled()) {
       LogUtil.logDebug(LOG, _eventId,
           "# of StateModelDefinition read from zk: " + _stateModelDefMap.size());
-      LogUtil
-          .logDebug(LOG, _eventId, "# of ConstraintMap read from zk: " + _constraintMap.size());
+      LogUtil.logDebug(LOG, _eventId, "# of ConstraintMap read from zk: " + _constraintMap.size());
       LogUtil.logDebug(LOG, _eventId, "LiveInstances: " + _liveInstanceMap.keySet());
       for (LiveInstance instance : _liveInstanceMap.values()) {
         LogUtil.logDebug(LOG, _eventId,
             "live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
       }
-      LogUtil
-          .logDebug(LOG, _eventId, "IdealStates: " + _idealStateCache.getIdealStateMap().keySet());
+      LogUtil.logDebug(LOG, _eventId,
+          "IdealStates: " + _idealStateCache.getIdealStateMap().keySet());
       LogUtil.logDebug(LOG, _eventId, "ResourceConfigs: " + _resourceConfigMap.keySet());
       LogUtil.logDebug(LOG, _eventId, "InstanceConfigs: " + _instanceConfigMap.keySet());
       LogUtil.logDebug(LOG, _eventId, "ClusterConfigs: " + _clusterConfig);
-      LogUtil
-          .logDebug(LOG, _eventId, "JobContexts: " + _taskDataCache.getContexts().keySet());
+      LogUtil.logDebug(LOG, _eventId, "JobContexts: " + _taskDataCache.getContexts().keySet());
     }
 
     if (LOG.isTraceEnabled()) {
@@ -301,8 +303,8 @@ public class ClusterDataCache {
         history.reportOffline();
         // persist history back to ZK.
         if (!accessor.setProperty(propertyKey, history)) {
-          LogUtil
-              .logError(LOG, _eventId, "Fails to persist participant online history back to ZK!");
+          LogUtil.logError(LOG, _eventId,
+              "Fails to persist participant online history back to ZK!");
         }
       }
       _instanceOfflineTimeMap.put(instance, history.getLastOfflineTime());
@@ -324,7 +326,6 @@ public class ClusterDataCache {
 
   /**
    * Return the last offline time map for all offline instances.
-   *
    * @return
    */
   public Map<String, Long> getInstanceOfflineTimeMap() {
@@ -355,7 +356,6 @@ public class ClusterDataCache {
     return _liveInstanceMap;
   }
 
-
   /**
    * Return the set of all instances names.
    */
@@ -365,7 +365,6 @@ public class ClusterDataCache {
 
   /**
    * Return all the live nodes that are enabled
-   *
    * @return A new set contains live instance name and that are marked enabled
    */
   public Set<String> getEnabledLiveInstances() {
@@ -377,7 +376,6 @@ public class ClusterDataCache {
 
   /**
    * Return all nodes that are enabled.
-   *
    * @return
    */
   public Set<String> getEnabledInstances() {
@@ -389,10 +387,9 @@ public class ClusterDataCache {
 
   /**
    * Return all the live nodes that are enabled and tagged with given instanceTag.
-   *
    * @param instanceTag The instance group tag.
-   * @return A new set contains live instance name and that are marked enabled and have the specified
-   * tag.
+   * @return A new set contains live instance name and that are marked enabled and have the
+   *         specified tag.
    */
   public Set<String> getEnabledLiveInstancesWithTag(String instanceTag) {
     Set<String> enabledLiveInstancesWithTag = new HashSet<>(getLiveInstances().keySet());
@@ -405,7 +402,6 @@ public class ClusterDataCache {
 
   /**
    * Return all the nodes that are tagged with given instance tag.
-   *
    * @param instanceTag The instance group tag.
    */
   public Set<String> getInstancesWithTag(String instanceTag) {
@@ -420,7 +416,6 @@ public class ClusterDataCache {
     return taggedInstances;
   }
 
-
   public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
     Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
     for (LiveInstance liveInstance : liveInstances) {
@@ -433,10 +428,8 @@ public class ClusterDataCache {
   /**
    * Provides the current state of the node for a given session id, the sessionid can be got from
    * LiveInstance
-   *
    * @param instanceName
    * @param clientSessionId
-   *
    * @return
    */
   public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId) {
@@ -503,7 +496,6 @@ public class ClusterDataCache {
 
   /**
    * Returns the resource config map
-   *
    * @return
    */
   public Map<String, ResourceConfig> getResourceConfigMap() {
@@ -526,7 +518,6 @@ public class ClusterDataCache {
 
   /**
    * Returns the instance config map
-   *
    * @return
    */
   public ResourceConfig getResourceConfig(String resource) {
@@ -567,7 +558,6 @@ public class ClusterDataCache {
     return _taskDataCache.getWorkflowConfig(resource);
   }
 
-
   public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs) {
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
     for (InstanceConfig instanceConfig : instanceConfigs) {
@@ -584,8 +574,8 @@ public class ClusterDataCache {
    */
   public Set<String> getDisabledInstancesForPartition(String resource, String partition) {
     Set<String> disabledInstancesForPartition = new HashSet<>(_disabledInstanceSet);
-    if (_disabledInstanceForPartitionMap.containsKey(resource) && _disabledInstanceForPartitionMap
-        .get(resource).containsKey(partition)) {
+    if (_disabledInstanceForPartitionMap.containsKey(resource)
+        && _disabledInstanceForPartitionMap.get(resource).containsKey(partition)) {
       disabledInstancesForPartition
           .addAll(_disabledInstanceForPartitionMap.get(resource).get(partition));
     }
@@ -595,7 +585,6 @@ public class ClusterDataCache {
 
   /**
    * This method allows one to fetch the set of nodes that are disabled
-   *
    * @return
    */
   public Set<String> getDisabledInstances() {
@@ -621,18 +610,13 @@ public class ClusterDataCache {
           try {
             replicas = Integer.parseInt(replicasStr);
           } catch (Exception e) {
-            LogUtil.logError(LOG, _eventId,
-                "invalid replicas string: " + replicasStr + " for " + (_isTaskCache
-                    ? "TASK"
-                    : "DEFAULT") + "pipeline");
+            LogUtil.logError(LOG, _eventId, "invalid replicas string: " + replicasStr + " for "
+                + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
           }
         }
       } else {
-        LogUtil.logError(LOG, _eventId,
-            "idealState for resource: " + resourceName + " does NOT have replicas for " + (
-                _isTaskCache
-                    ? "TASK"
-                    : "DEFAULT") + "pipeline");
+        LogUtil.logError(LOG, _eventId, "idealState for resource: " + resourceName
+            + " does NOT have replicas for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
       }
     }
     return replicas;
@@ -675,21 +659,29 @@ public class ClusterDataCache {
       _participantActiveTaskCount.put(liveInstance, 0);
     }
     // Active task == init and running tasks
-    fillActiveTaskCount(currentStateOutput.getPartitionCountWithPendingState(TaskConstants.STATE_MODEL_NAME,
-        TaskPartitionState.INIT.name()), _participantActiveTaskCount);
-    fillActiveTaskCount(currentStateOutput.getPartitionCountWithPendingState(TaskConstants.STATE_MODEL_NAME,
-        TaskPartitionState.RUNNING.name()), _participantActiveTaskCount);
-    fillActiveTaskCount(currentStateOutput.getPartitionCountWithCurrentState(TaskConstants.STATE_MODEL_NAME,
-        TaskPartitionState.INIT.name()), _participantActiveTaskCount);
-    fillActiveTaskCount(currentStateOutput
-        .getPartitionCountWithCurrentState(TaskConstants.STATE_MODEL_NAME,
-            TaskPartitionState.RUNNING.name()), _participantActiveTaskCount);
+    fillActiveTaskCount(
+        currentStateOutput.getPartitionCountWithPendingState(TaskConstants.STATE_MODEL_NAME,
+            TaskPartitionState.INIT.name()),
+        _participantActiveTaskCount);
+    fillActiveTaskCount(
+        currentStateOutput.getPartitionCountWithPendingState(TaskConstants.STATE_MODEL_NAME,
+            TaskPartitionState.RUNNING.name()),
+        _participantActiveTaskCount);
+    fillActiveTaskCount(
+        currentStateOutput.getPartitionCountWithCurrentState(TaskConstants.STATE_MODEL_NAME,
+            TaskPartitionState.INIT.name()),
+        _participantActiveTaskCount);
+    fillActiveTaskCount(
+        currentStateOutput.getPartitionCountWithCurrentState(TaskConstants.STATE_MODEL_NAME,
+            TaskPartitionState.RUNNING.name()),
+        _participantActiveTaskCount);
   }
 
   private void fillActiveTaskCount(Map<String, Integer> additionPartitionMap,
       Map<String, Integer> partitionMap) {
     for (String participant : additionPartitionMap.keySet()) {
-      partitionMap.put(participant, partitionMap.get(participant) + additionPartitionMap.get(participant));
+      partitionMap.put(participant,
+          partitionMap.get(participant) + additionPartitionMap.get(participant));
     }
   }
 
@@ -753,7 +745,6 @@ public class ClusterDataCache {
 
   /**
    * Get local cached external view map
-   *
    * @return
    */
   public Map<String, ExternalView> getExternalViews() {
@@ -762,7 +753,6 @@ public class ClusterDataCache {
 
   /**
    * Update the cached external view map
-   *
    * @param externalViews
    */
   public void updateExternalViews(List<ExternalView> externalViews) {
@@ -773,7 +763,6 @@ public class ClusterDataCache {
 
   /**
    * Remove dead external views from map
-   *
    * @param resourceNames
    */
 
@@ -787,7 +776,7 @@ public class ClusterDataCache {
    * Indicate that a full read should be done on the next refresh
    */
   public synchronized void requireFullRefresh() {
-    for(ChangeType type : ChangeType.values()) {
+    for (ChangeType type : ChangeType.values()) {
       _propertyDataChangedMap.put(type, true);
     }
   }
@@ -802,9 +791,7 @@ public class ClusterDataCache {
 
   /**
    * Get cached resourceAssignment (bestPossible mapping) for a resource
-   *
    * @param resource
-   *
    * @return
    */
   public ResourceAssignment getCachedResourceAssignment(String resource) {
@@ -813,7 +800,6 @@ public class ClusterDataCache {
 
   /**
    * Get cached resourceAssignments
-   *
    * @return
    */
   public Map<String, ResourceAssignment> getCachedResourceAssignments() {
@@ -822,21 +808,16 @@ public class ClusterDataCache {
 
   /**
    * Cache resourceAssignment (bestPossible mapping) for a resource
-   *
    * @param resource
-   *
    * @return
    */
   public void setCachedResourceAssignment(String resource, ResourceAssignment resourceAssignment) {
     _resourceAssignmentCache.put(resource, resourceAssignment);
   }
 
-
   /**
    * Get cached resourceAssignment (ideal mapping) for a resource
-   *
    * @param resource
-   *
    * @return
    */
   public ZNRecord getCachedIdealMapping(String resource) {
@@ -845,7 +826,6 @@ public class ClusterDataCache {
 
   /**
    * Get cached idealmapping
-   *
    * @return
    */
   public Map<String, ZNRecord> getCachedIdealMapping() {
@@ -854,16 +834,13 @@ public class ClusterDataCache {
 
   /**
    * Cache resourceAssignment (ideal mapping) for a resource
-   *
    * @param resource
-   *
    * @return
    */
   public void setCachedIdealMapping(String resource, ZNRecord mapping) {
     _idealMappingCache.put(resource, mapping);
   }
 
-
   public void clearCachedResourceAssignments() {
     _resourceAssignmentCache.clear();
     _idealMappingCache.clear();
@@ -914,6 +891,17 @@ public class ClusterDataCache {
   }
 
   /**
+   * Returns whether there has been LiveInstance change. Once called, it will be set to false. To be
+   * used for task-assigning.
+   * @return
+   */
+  public boolean getExistsLiveInstanceOrCurrentStateChange() {
+    boolean change = _existsLiveInstanceOrCurrentStateChange;
+    _existsLiveInstanceOrCurrentStateChange = false;
+    return change;
+  }
+
+  /**
    * toString method to print the entire cluster state
    */
   @Override
@@ -931,4 +919,4 @@ public class ClusterDataCache {
 
     return sb.toString();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index faa8da5..b996007 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -20,7 +20,6 @@ package org.apache.helix.model;
  */
 
 import com.google.common.collect.Maps;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -65,7 +64,7 @@ public class ClusterConfig extends HelixProperty {
     TARGET_EXTERNALVIEW_ENABLED,
     @Deprecated // ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE will take
         // precedence if it is set
-    ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance state
+        ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance state
     // transition if the number of partitons that need
     // recovery exceeds this limitation
     ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance
@@ -73,6 +72,10 @@ public class ClusterConfig extends HelixProperty {
     // partitons that need recovery or in
     // error exceeds this limitation
     DISABLED_INSTANCES,
+    VIEW_CLUSTER, // Set to "true" to indicate this is a view cluster
+    VIEW_CLUSTER_SOURCES, // Map field, key is the name of source cluster, value is
+    // ViewClusterSourceConfig JSON string
+    VIEW_CLUSTER_REFRESH_PERIOD, // In second
 
     // Specifies job types and used for quota allocation
     QUOTA_TYPES
@@ -107,10 +110,37 @@ public class ClusterConfig extends HelixProperty {
     super(record);
   }
 
+  public void setViewCluster() {
+    _record.setBooleanField(ClusterConfigProperty.VIEW_CLUSTER.name(), true);
+  }
+
   /**
-   * Set task quota type with the ratio of this quota
-   * @param quotaType
-   * @param quotaRatio
+   * Whether this cluster is a ViewCluster
+   * @return
+   */
+  public boolean isViewCluster() {
+    return _record
+        .getBooleanField(ClusterConfigProperty.VIEW_CLUSTER.name(), false);
+  }
+
+  /**
+   * Set task quota type with the ratio of this quota.
+   * @param quotaType String
+   * @param quotaRatio int
+   */
+  public void setTaskQuotaRatio(String quotaType, int quotaRatio) {
+    if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) == null) {
+      _record.setMapField(ClusterConfigProperty.QUOTA_TYPES.name(), new HashMap<String, String>());
+    }
+    _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name())
+        .put(quotaType, Integer.toString(quotaRatio));
+  }
+
+  /**
+   * Set task quota type with the ratio of this quota. Quota ratio must be a String that is
+   * parse-able into an int.
+   * @param quotaType String
+   * @param quotaRatio String
    */
   public void setTaskQuotaRatio(String quotaType, String quotaRatio) {
     if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) == null) {
@@ -121,6 +151,16 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Remove task quota with the given quota type.
+   * @param quotaType
+   */
+  public void removeTaskQuotaRatio(String quotaType) {
+    if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) != null) {
+      _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()).remove(quotaType);
+    }
+  }
+
+  /**
    * Given quota type, return ratio of the quota. If quota type does not exist, return "0"
    * @param quotaType quota type
    * @return ratio of quota type
@@ -144,6 +184,29 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Resets all quota-related information in this ClusterConfig.
+   */
+  public void resetTaskQuotaRatioMap() {
+    if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) != null) {
+      _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()).clear();
+    }
+  }
+
+  /**
+   * Set view cluster max refresh period
+   * @param refreshPeriod refresh period in second
+   */
+  public void setViewClusterRefreshPeriod(int refreshPeriod) {
+    _record.setIntField(ClusterConfigProperty.VIEW_CLUSTER_REFRESH_PERIOD.name(),
+        refreshPeriod);
+  }
+
+  public int getViewClusterRefershPeriod() {
+    return _record.getIntField(ClusterConfigProperty.VIEW_CLUSTER_REFRESH_PERIOD.name(),
+        DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD);
+  }
+
+  /**
    * Whether to persist best possible assignment in a resource's idealstate.
    *
    * @return
@@ -638,4 +701,4 @@ public class ClusterConfig extends HelixProperty {
   public String getClusterName() {
     return _record.getId();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index b447cf3..36e5698 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -18,6 +18,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.task.assigner.AssignableInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +42,12 @@ public abstract class AbstractTaskDispatcher {
       ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState,
       Set<Integer> assignedPartitions, Set<Integer> partitionsToDropFromIs,
       Map<Integer, PartitionAssignment> paMap, TargetState jobTgtState,
-      Set<Integer> skippedPartitions) {
+      Set<Integer> skippedPartitions, ClusterDataCache cache) {
+
+    // Get AssignableInstanceMap for releasing resources for tasks in terminal states
+    Map<String, AssignableInstance> assignableInstanceMap =
+        cache.getAssignableInstanceManager().getAssignableInstanceMap();
+
     // Iterate through all instances
     for (String instance : prevInstanceToTaskAssignments.keySet()) {
       if (excludedInstances.contains(instance)) {
@@ -54,8 +60,8 @@ public abstract class AbstractTaskDispatcher {
       Set<Integer> donePartitions = new TreeSet<>();
       for (int pId : pSet) {
         final String pName = pName(jobResource, pId);
-        TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput, jobResource, pId, pName,
-                instance, jobCtx);
+        TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput,
+            jobResource, pId, pName, instance, jobCtx);
 
         // Check for pending state transitions on this (partition, instance).
         Message pendingMessage =
@@ -80,104 +86,160 @@ public abstract class AbstractTaskDispatcher {
           paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
           assignedPartitions.add(pId);
           if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Instance %s requested a state transition to %s for partition %s.", instance,
-                requestedState, pName));
+            LOG.debug(
+                String.format("Instance %s requested a state transition to %s for partition %s.",
+                    instance, requestedState, pName));
           }
           continue;
         }
 
+        // Get AssignableInstance for this instance and TaskConfig for releasing resources
+        String quotaType = jobCfg.getQuotaType();
+        AssignableInstance assignableInstance = assignableInstanceMap.get(instance);
+        String taskId;
+        if (TaskUtil.isGenericTaskJob(jobCfg)) {
+          taskId = jobCtx.getTaskIdForPartition(pId);
+        } else {
+          taskId = pName;
+        }
+        TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
+
         switch (currState) {
-        case RUNNING: {
-          TaskPartitionState nextState = TaskPartitionState.RUNNING;
-          if (jobState.equals(TaskState.TIMING_OUT)) {
-            nextState = TaskPartitionState.TASK_ABORTED;
-          } else if (jobTgtState.equals(TargetState.STOP)) {
-            nextState = TaskPartitionState.STOPPED;
-          }
+          case RUNNING: {
+            TaskPartitionState nextState = TaskPartitionState.RUNNING;
+            if (jobState == TaskState.TIMING_OUT) {
+              nextState = TaskPartitionState.TASK_ABORTED;
+            } else if (jobTgtState == TargetState.STOP) {
+              nextState = TaskPartitionState.STOPPED;
+            } else if (jobState == TaskState.ABORTED || jobState == TaskState.FAILED
+                || jobState == TaskState.FAILING || jobState == TaskState.TIMED_OUT) {
+              // Drop tasks if parent job is not in progress
+              paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
+              break;
+            }
 
-          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-          assignedPartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String
-                .format("Setting task partition %s state to %s on instance %s.", pName, nextState,
-                    instance));
-          }
-        }
-        break;
-        case STOPPED: {
-          TaskPartitionState nextState;
-          if (jobTgtState.equals(TargetState.START)) {
-            nextState = TaskPartitionState.RUNNING;
-          } else {
-            nextState = TaskPartitionState.STOPPED;
+            paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+            assignedPartitions.add(pId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+                  nextState, instance));
+            }
           }
+          break;
+          case STOPPED: {
+            TaskPartitionState nextState;
+            if (jobTgtState.equals(TargetState.START)) {
+              nextState = TaskPartitionState.RUNNING;
+            } else {
+              nextState = TaskPartitionState.STOPPED;
+              // This task is STOPPED and not going to be re-run, so release this task
+              assignableInstance.release(taskConfig, quotaType);
+            }
 
-          paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, nextState.name()));
-          assignedPartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String
-                .format("Setting task partition %s state to %s on instance %s.", pName, nextState,
-                    instance));
+            paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, nextState.name()));
+            assignedPartitions.add(pId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+                  nextState, instance));
+            }
           }
-        }
-        break;
-        case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
-                pName, currState));
+          break;
+          case COMPLETED: {
+            // The task has completed on this partition. Mark as such in the context object.
+            donePartitions.add(pId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String.format(
+                  "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
+                  pName, currState));
+            }
+            partitionsToDropFromIs.add(pId);
+            markPartitionCompleted(jobCtx, pId);
+
+            // This task is COMPLETED, so release this task
+            assignableInstance.release(taskConfig, quotaType);
           }
-          partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(jobCtx, pId);
-        }
-        break;
-        case TIMED_OUT:
-        case TASK_ERROR:
-        case TASK_ABORTED:
-        case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a different instance.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.",
-                pName, currState, jobCtx.getPartitionInfo(pId)));
+          break;
+          case TIMED_OUT:
+
+          case TASK_ERROR:
+
+          case TASK_ABORTED:
+
+          case ERROR: {
+            donePartitions.add(pId); // The task may be rescheduled on a different instance.
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String.format(
+                  "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.",
+                  pName, currState, jobCtx.getPartitionInfo(pId)));
+            }
+            markPartitionError(jobCtx, pId, currState, true);
+            // The error policy is to fail the task as soon a single partition fails for a specified
+            // maximum number of attempts or task is in ABORTED state.
+            // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't
+            // cause job fail.
+            // After all tasks are aborted, they will be dropped, because of job timeout.
+            if (jobState != TaskState.TIMED_OUT && jobState != TaskState.TIMING_OUT) {
+              if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()
+                  || currState.equals(TaskPartitionState.TASK_ABORTED)
+                  || currState.equals(TaskPartitionState.ERROR)) {
+                skippedPartitions.add(pId);
+                partitionsToDropFromIs.add(pId);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("skippedPartitions:" + skippedPartitions);
+                }
+              } else {
+                // Mark the task to be started at some later time (if enabled)
+                markPartitionDelayed(jobCfg, jobCtx, pId);
+              }
+            }
+            // Release this task
+            assignableInstance.release(taskConfig, quotaType);
           }
-          markPartitionError(jobCtx, pId, currState, true);
-          // The error policy is to fail the task as soon a single partition fails for a specified
-          // maximum number of attempts or task is in ABORTED state.
-          // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't cause job fail.
-          // After all tasks are aborted, they will be dropped, because of job timeout.
-          if (jobState != TaskState.TIMED_OUT && jobState != TaskState.TIMING_OUT) {
-            if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()
-                || currState.equals(TaskPartitionState.TASK_ABORTED)
-                || currState.equals(TaskPartitionState.ERROR)) {
-              skippedPartitions.add(pId);
+          break;
+          case INIT: {
+            // INIT is a temporary state for tasks
+            // Two possible scenarios for INIT:
+            // 1. Task is getting scheduled for the first time. In this case, Task's state will go
+            // from null->INIT->RUNNING, and this INIT state will be transient and very short-lived
+            // 2. Task is getting scheduled for the first time, but in this case, job is timed out or
+            // timing out. In this case, it will be sent back to INIT state to be removed. Here we
+            // ensure that this task then goes from INIT to DROPPED so that it will be released from
+            // AssignableInstance to prevent resource leak
+            if (jobState == TaskState.TIMED_OUT || jobState == TaskState.TIMING_OUT
+                || jobTgtState == TargetState.DELETE) {
+              // Job is timed out or timing out or targetState is to be deleted, so its tasks will be
+              // sent back to INIT
+              // In this case, tasks' IdealState will be removed, and they will be sent to DROPPED
               partitionsToDropFromIs.add(pId);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("skippedPartitions:" + skippedPartitions);
-              }
-            } else {
-              // Mark the task to be started at some later time (if enabled)
-              markPartitionDelayed(jobCfg, jobCtx, pId);
+
+              // Also release resources for these tasks
+              assignableInstance.release(taskConfig, quotaType);
+
+            } else if (jobState == TaskState.IN_PROGRESS
+                && (jobTgtState != TargetState.STOP && jobTgtState != TargetState.DELETE)) {
+              // Job is in progress, implying that tasks are being re-tried, so set it to RUNNING
+              paMap.put(pId,
+                  new JobRebalancer.PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
+              assignedPartitions.add(pId);
             }
           }
-        }
-        break;
-        case INIT:
-        case DROPPED: {
-          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
-          donePartitions.add(pId);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format(
-                "Task partition %s has state %s. It will be dropped from the current ideal state.",
-                pName, currState));
+
+          case DROPPED: {
+            // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+            donePartitions.add(pId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(String.format(
+                  "Task partition %s has state %s. It will be dropped from the current ideal state.",
+                  pName, currState));
+            }
+            // If it's DROPPED, release this task. If INIT, do not release
+            if (currState == TaskPartitionState.DROPPED) {
+              assignableInstance.release(taskConfig, quotaType);
+            }
           }
-        }
-        break;
-        default:
-          throw new AssertionError("Unknown enum symbol: " + currState);
+          break;
+          default:
+            throw new AssertionError("Unknown enum symbol: " + currState);
         }
       }
 
@@ -190,7 +252,7 @@ public abstract class AbstractTaskDispatcher {
    * Computes the partition name given the resource name and partition id.
    */
   protected String pName(String resource, int pId) {
-    return resource + "_" + pId;
+    return String.format("%s_%s", resource, pId);
   }
 
   /**
@@ -206,37 +268,39 @@ public abstract class AbstractTaskDispatcher {
     }
   }
 
-  private TaskPartitionState updateJobContextAndGetTaskCurrentState(CurrentStateOutput currentStateOutput,
-      String jobResource, Integer pId, String pName, String instance, JobContext jobCtx) {
-    String currentStateString = currentStateOutput.getCurrentState(jobResource, new Partition(
-        pName), instance);
+  private TaskPartitionState updateJobContextAndGetTaskCurrentState(
+      CurrentStateOutput currentStateOutput, String jobResource, Integer pId, String pName,
+      String instance, JobContext jobCtx) {
+    String currentStateString =
+        currentStateOutput.getCurrentState(jobResource, new Partition(pName), instance);
     if (currentStateString == null) {
       // Task state is either DROPPED or INIT
       return jobCtx.getPartitionState(pId);
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
     jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(
-        pName), instance);
+    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
     if (taskMsg != null) {
       jobCtx.setPartitionInfo(pId, taskMsg);
     }
     return currentState;
   }
 
-  private void processTaskWithPendingMessage(ResourceAssignment prevAssignment, Integer pId, String pName,
-      String instance, Message pendingMessage, TaskState jobState, TaskPartitionState currState,
-      Map<Integer, PartitionAssignment> paMap, Set<Integer> assignedPartitions) {
+  private void processTaskWithPendingMessage(ResourceAssignment prevAssignment, Integer pId,
+      String pName, String instance, Message pendingMessage, TaskState jobState,
+      TaskPartitionState currState, Map<Integer, PartitionAssignment> paMap,
+      Set<Integer> assignedPartitions) {
 
     Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
     if (stateMap != null) {
       String prevState = stateMap.get(instance);
       if (!pendingMessage.getToState().equals(prevState)) {
-        LOG.warn(String.format("Task pending to-state is %s while previous assigned state is %s. This should not"
-            + "happen.", pendingMessage.getToState(), prevState));
+        LOG.warn(String.format(
+            "Task pending to-state is %s while previous assigned state is %s. This should not"
+                + "happen.",
+            pendingMessage.getToState(), prevState));
       }
-      if (jobState == TaskState.TIMING_OUT
-          && currState == TaskPartitionState.INIT
+      if (jobState == TaskState.TIMING_OUT && currState == TaskPartitionState.INIT
           && prevState.equals(TaskPartitionState.RUNNING.name())) {
         // While job is timing out, if the task is pending on INIT->RUNNING, set it back to INIT,
         // so that Helix will cancel the transition.
@@ -309,8 +373,10 @@ public abstract class AbstractTaskDispatcher {
   }
 
   protected void failJob(String jobName, WorkflowContext workflowContext, JobContext jobContext,
-      WorkflowConfig workflowConfig, Map<String, JobConfig> jobConfigMap) {
-    markJobFailed(jobName, jobContext, workflowConfig, workflowContext, jobConfigMap);
+      WorkflowConfig workflowConfig, Map<String, JobConfig> jobConfigMap,
+      ClusterDataCache clusterDataCache) {
+    markJobFailed(jobName, jobContext, workflowConfig, workflowContext, jobConfigMap,
+        clusterDataCache);
     // Mark all INIT task to TASK_ABORTED
     for (int pId : jobContext.getPartitionSet()) {
       if (jobContext.getPartitionState(pId) == TaskPartitionState.INIT) {
@@ -322,7 +388,8 @@ public abstract class AbstractTaskDispatcher {
     TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
   }
 
-  // Compute real assignment from theoretical calculation with applied throttling or other logics
+  // Compute real assignment from theoretical calculation with applied throttling
+  // This is the actual assigning part
   protected void handleAdditionalTaskAssignment(
       Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String> excludedInstances,
       String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig jobCfg,
@@ -331,6 +398,11 @@ public abstract class AbstractTaskDispatcher {
       Map<Integer, PartitionAssignment> paMap, Set<Integer> skippedPartitions,
       TaskAssignmentCalculator taskAssignmentCal, Set<Integer> allPartitions, long currentTime,
       Collection<String> liveInstances) {
+
+    // See if there was LiveInstance change and cache LiveInstances from this iteration of pipeline
+    boolean existsLiveInstanceOrCurrentStateChange =
+        cache.getExistsLiveInstanceOrCurrentStateChange();
+
     // The excludeSet contains the set of task partitions that must be excluded from consideration
     // when making any new assignments.
     // This includes all completed, failed, delayed, and already assigned partitions.
@@ -338,20 +410,86 @@ public abstract class AbstractTaskDispatcher {
     addCompletedTasks(excludeSet, jobCtx, allPartitions);
     addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
     excludeSet.addAll(skippedPartitions);
-    excludeSet.addAll(TaskUtil.getNonReadyPartitions(jobCtx, currentTime));
-    // Get instance->[partition, ...] mappings for the target resource.
-    Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal
-        .getTaskAssignment(currStateOutput, prevTaskToInstanceStateAssignment, liveInstances,
-            jobCfg, jobCtx, workflowConfig, workflowCtx, allPartitions, cache.getIdealStates());
+    Set<Integer> partitionsWithDelay = TaskUtil.getNonReadyPartitions(jobCtx, currentTime);
+    excludeSet.addAll(partitionsWithDelay);
+
+    // The following is filtering of tasks before passing them to the assigner
+    // Only feed in tasks that need to be assigned (null and STOPPED)
+    Set<Integer> filteredTaskPartitionNumbers = filterTasks(allPartitions, jobCtx, liveInstances);
+    // Remove all excludeSet tasks to be safer because some STOPPED tasks have been already
+    // re-started (excludeSet includes already-assigned partitions). Also tasks with their retry
+    // limit exceed (addGiveupPartitions) will be removed as well
+    filteredTaskPartitionNumbers.removeAll(excludeSet);
+
+    Set<Integer> partitionsToRetryOnLiveInstanceChangeForTargetedJob = new HashSet<>();
+    // If the job is a targeted job, in case of live instance change, we need to assign
+    // non-terminal tasks so that they could be re-scheduled
+    if (!TaskUtil.isGenericTaskJob(jobCfg) && existsLiveInstanceOrCurrentStateChange) {
+      // This job is a targeted job, so FixedAssignmentCalculator will be used
+      // There has been a live instance change. Must re-add incomplete task partitions to be
+      // re-assigned and re-scheduled
+      for (int partitionNum : allPartitions) {
+        TaskPartitionState taskPartitionState = jobCtx.getPartitionState(partitionNum);
+        if (isTaskNotInTerminalState(taskPartitionState)
+            && !partitionsWithDelay.contains(partitionNum)) {
+          // Some targeted tasks may have timed-out due to Participants (instances) not being
+          // live, so we give tasks like these another try
+          // If some of these tasks are already scheduled and running, they will be dropped as
+          // well
+          // Also, do not include partitions with delay that are not ready to be assigned and
+          // scheduled
+          partitionsToRetryOnLiveInstanceChangeForTargetedJob.add(partitionNum);
+        }
+      }
+    }
+    filteredTaskPartitionNumbers.addAll(partitionsToRetryOnLiveInstanceChangeForTargetedJob);
 
-    if (!TaskUtil.isGenericTaskJob(jobCfg) || jobCfg.isRebalanceRunningTask()) {
+    // The actual assignment is computed here
+    // Get instance->[partition, ...] mappings for the target resource.
+    Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal.getTaskAssignment(
+        currStateOutput, prevTaskToInstanceStateAssignment, liveInstances, jobCfg, jobCtx,
+        workflowConfig, workflowCtx, filteredTaskPartitionNumbers, cache.getIdealStates());
+
+    if (!TaskUtil.isGenericTaskJob(jobCfg) && jobCfg.isRebalanceRunningTask()) {
+      // TODO: Revisit the logic for isRebalanceRunningTask() and valid use cases for it
+      // TODO: isRebalanceRunningTask() was originally put in place to allow users to move
+      // ("rebalance") long-running tasks, but there hasn't been a clear use case for this
+      // Previously, there was a bug in the condition above (it was || where it should have been &&)
       dropRebalancedRunningTasks(tgtPartitionAssignments, prevInstanceToTaskAssignments, paMap,
           jobCtx);
     }
 
+    // If this is a targeted job and if there was a live instance change
+    if (!TaskUtil.isGenericTaskJob(jobCfg) && existsLiveInstanceOrCurrentStateChange) {
+      // Drop current jobs only if they are assigned to a different instance, regardless of
+      // the jobCfg.isRebalanceRunningTask() setting
+      dropRebalancedRunningTasks(tgtPartitionAssignments, prevInstanceToTaskAssignments, paMap,
+          jobCtx);
+    }
+    // Go through ALL instances and assign/throttle tasks accordingly
     for (Map.Entry<String, SortedSet<Integer>> entry : prevInstanceToTaskAssignments.entrySet()) {
       String instance = entry.getKey();
-      if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances.contains(instance)) {
+      if (!tgtPartitionAssignments.containsKey(instance)) {
+        // There is no assignment made for this instance, so it is safe to skip
+        continue;
+      }
+      if (excludedInstances.contains(instance)) {
+        // There is a task assignment made for this instance, but for some reason, we cannot
+        // assign to this instance. So we must skip the actual scheduling, but we must also
+        // release the prematurely assigned tasks from AssignableInstance
+        if (!cache.getAssignableInstanceManager().getAssignableInstanceMap()
+            .containsKey(instance)) {
+          continue; // This should not happen; skip!
+        }
+        AssignableInstance assignableInstance =
+            cache.getAssignableInstanceManager().getAssignableInstanceMap().get(instance);
+        String quotaType = jobCfg.getQuotaType();
+        for (int partitionNum : tgtPartitionAssignments.get(instance)) {
+          // Get the TaskConfig for this partitionNumber
+          String taskId = getTaskId(jobCfg, jobCtx, partitionNum);
+          TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
+          assignableInstance.release(taskConfig, quotaType);
+        }
         continue;
       }
       // 1. throttled by job configuration
@@ -370,14 +508,13 @@ public abstract class AbstractTaskDispatcher {
       if (LOG.isDebugEnabled()) {
         LOG.debug(String.format(
             "Throttle tasks to be assigned to instance %s using limitation: Job Concurrent Task(%d), "
-                + "Participant Max Task(%d). Remaining capacity %d.", instance, jobCfgLimitation,
-            participantCapacity, numToAssign));
+                + "Participant Max Task(%d). Remaining capacity %d.",
+            instance, jobCfgLimitation, participantCapacity, numToAssign));
       }
+      Set<Integer> throttledSet = new HashSet<>();
       if (numToAssign > 0) {
-        Set<Integer> throttledSet = new HashSet<Integer>();
-        List<Integer> nextPartitions =
-            getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, throttledSet,
-                numToAssign);
+        List<Integer> nextPartitions = getNextPartitions(tgtPartitionAssignments.get(instance),
+            excludeSet, throttledSet, numToAssign);
         for (Integer pId : nextPartitions) {
           String pName = pName(jobResource, pId);
           paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
@@ -392,10 +529,33 @@ public abstract class AbstractTaskDispatcher {
         }
         cache.setParticipantActiveTaskCount(instance,
             cache.getParticipantActiveTaskCount(instance) + nextPartitions.size());
-        if (!throttledSet.isEmpty()) {
-          LOG.debug(
-              throttledSet.size() + "tasks are ready but throttled when assigned to participant.");
+      } else {
+        // No assignment was actually scheduled, so this assignment needs to be released
+        // Put all assignments in throttledSet. Be sure to subtract excludeSet because excludeSet is
+        // already applied at filteringPartitions (excludeSet may contain partitions that are
+        // currently running)
+        Set<Integer> throttledSetWithExcludeSet =
+            new HashSet<>(tgtPartitionAssignments.get(instance));
+        throttledSetWithExcludeSet.removeAll(excludeSet); // Remove excludeSet
+        throttledSet.addAll(throttledSetWithExcludeSet);
+      }
+      if (!throttledSet.isEmpty()) {
+        // Release the tasks in throttledSet because they weren't actually assigned
+        if (!cache.getAssignableInstanceManager().getAssignableInstanceMap()
+            .containsKey(instance)) {
+          continue;
+        }
+        AssignableInstance assignableInstance =
+            cache.getAssignableInstanceManager().getAssignableInstanceMap().get(instance);
+        String quotaType = jobCfg.getQuotaType();
+        for (int partitionNum : throttledSet) {
+          // Get the TaskConfig for this partitionNumber
+          String taskId = getTaskId(jobCfg, jobCtx, partitionNum);
+          TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
+          assignableInstance.release(taskConfig, quotaType);
         }
+        LOG.debug(
+            throttledSet.size() + "tasks are ready but throttled when assigned to participant.");
       }
     }
   }
@@ -408,9 +568,8 @@ public abstract class AbstractTaskDispatcher {
       long retryTime = jobCtx.getNextRetryTime(p);
       TaskPartitionState state = jobCtx.getPartitionState(p);
       state = (state != null) ? state : TaskPartitionState.INIT;
-      Set<TaskPartitionState> errorStates =
-          Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
-              TaskPartitionState.TIMED_OUT);
+      Set<TaskPartitionState> errorStates = Sets.newHashSet(TaskPartitionState.ERROR,
+          TaskPartitionState.TASK_ERROR, TaskPartitionState.TIMED_OUT);
       if (errorStates.contains(state) && retryTime > now && retryTime < earliestTime) {
         earliestTime = retryTime;
         shouldSchedule = true;
@@ -426,11 +585,9 @@ public abstract class AbstractTaskDispatcher {
     }
   }
 
-
-
   // add all partitions that have been tried maxNumberAttempts
-  protected static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds,
-      JobConfig cfg) {
+  protected static void addGiveupPartitions(Set<Integer> set, JobContext ctx,
+      Iterable<Integer> pIds, JobConfig cfg) {
     for (Integer pId : pIds) {
       if (isTaskGivenup(ctx, cfg, pId)) {
         set.add(pId);
@@ -462,6 +619,49 @@ public abstract class AbstractTaskDispatcher {
     }
   }
 
+  /**
+   * Returns a filtered Iterable of tasks. To filter tasks in this context means to only allow tasks
+   * whose contexts are either null or in STOPPED, TIMED_OUT, TASK_ERROR, or DROPPED state because
+   * only the
+   * tasks whose contexts are in these states are eligible to be assigned or re-tried.
+   * Also, for those tasks in non-terminal states whose previously assigned instances are no longer
+   * LiveInstances are re-added so that they could be re-assigned.
+   * @param allPartitions
+   * @param jobContext
+   * @return a filter Iterable of task partition numbers
+   */
+  private Set<Integer> filterTasks(Iterable<Integer> allPartitions, JobContext jobContext,
+      Collection<String> liveInstances) {
+    Set<Integer> filteredTasks = new HashSet<>();
+    for (int partitionNumber : allPartitions) {
+      TaskPartitionState state = jobContext.getPartitionState(partitionNumber);
+      // Allow tasks eligible for scheduling
+      if (state == null || state == TaskPartitionState.STOPPED
+          || state == TaskPartitionState.TIMED_OUT || state == TaskPartitionState.TASK_ERROR
+          || state == TaskPartitionState.DROPPED) {
+        filteredTasks.add(partitionNumber);
+      }
+      // Allow tasks whose assigned instances are no longer live for rescheduling
+      if (isTaskNotInTerminalState(state)) {
+        String assignedParticipant = jobContext.getAssignedParticipant(partitionNumber);
+        if (assignedParticipant != null && !liveInstances.contains(assignedParticipant)) {
+          filteredTasks.add(partitionNumber);
+        }
+      }
+    }
+    return filteredTasks;
+  }
+
+  /**
+   * Returns whether if the task is not in a terminal state and could be re-scheduled.
+   * @param state
+   * @return
+   */
+  private boolean isTaskNotInTerminalState(TaskPartitionState state) {
+    return state != TaskPartitionState.COMPLETED && state != TaskPartitionState.TASK_ABORTED
+        && state != TaskPartitionState.DROPPED && state != TaskPartitionState.ERROR;
+  }
+
   protected static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
     TaskPartitionState state = ctx.getPartitionState(pId);
     if (state == TaskPartitionState.TASK_ABORTED || state == TaskPartitionState.ERROR) {
@@ -473,21 +673,38 @@ public abstract class AbstractTaskDispatcher {
     return false;
   }
 
-
   /**
    * If assignment is different from previous assignment, drop the old running task if it's no
    * longer assigned to the same instance, but not removing it from excludeSet because the same task
    * should not be assigned to the new instance right away.
+   * Also only drop if the old and the new assignments both have the partition (task) and they
+   * differ (because that means the task has been assigned to a different instance).
    */
   private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> newAssignment,
       Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, PartitionAssignment> paMap,
       JobContext jobContext) {
+
     for (String instance : oldAssignment.keySet()) {
-      for (Integer pId : oldAssignment.get(instance)) {
-        if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING && !newAssignment
-            .get(instance).contains(pId)) {
-          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
-          jobContext.setPartitionState(pId, TaskPartitionState.DROPPED);
+      for (int pId : oldAssignment.get(instance)) {
+        if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING) {
+          // Check if the new assignment has this task on a different instance
+          boolean existsInNewAssignment = false;
+          for (Map.Entry<String, SortedSet<Integer>> entry : newAssignment.entrySet()) {
+            if (!entry.getKey().equals(instance) && entry.getValue().contains(pId)) {
+              // Found the partition number; new assignment has been made
+              existsInNewAssignment = true;
+              LOG.info(
+                  "Currently running task partition number: {} is being dropped from instance: {} and will be newly assigned to instance: {}. This is due to a LiveInstance/CurrentState change, and because this is a targeted task.",
+                  pId, instance, entry.getKey());
+              break;
+            }
+          }
+          if (existsInNewAssignment) {
+            // We need to drop this task in the old assignment
+            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
+            jobContext.setPartitionState(pId, TaskPartitionState.DROPPED);
+            // Now it will be dropped and be rescheduled
+          }
         }
       }
     }
@@ -495,11 +712,11 @@ public abstract class AbstractTaskDispatcher {
 
   protected void markJobComplete(String jobName, JobContext jobContext,
       WorkflowConfig workflowConfig, WorkflowContext workflowContext,
-      Map<String, JobConfig> jobConfigMap) {
+      Map<String, JobConfig> jobConfigMap, ClusterDataCache clusterDataCache) {
     long currentTime = System.currentTimeMillis();
     workflowContext.setJobState(jobName, TaskState.COMPLETED);
     jobContext.setFinishTime(currentTime);
-    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) {
+    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, clusterDataCache)) {
       workflowContext.setFinishTime(currentTime);
       updateWorkflowMonitor(workflowContext, workflowConfig);
     }
@@ -507,13 +724,14 @@ public abstract class AbstractTaskDispatcher {
   }
 
   protected void markJobFailed(String jobName, JobContext jobContext, WorkflowConfig workflowConfig,
-      WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap) {
+      WorkflowContext workflowContext, Map<String, JobConfig> jobConfigMap,
+      ClusterDataCache clusterDataCache) {
     long currentTime = System.currentTimeMillis();
     workflowContext.setJobState(jobName, TaskState.FAILED);
     if (jobContext != null) {
       jobContext.setFinishTime(currentTime);
     }
-    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap)) {
+    if (isWorkflowFinished(workflowContext, workflowConfig, jobConfigMap, clusterDataCache)) {
       workflowContext.setFinishTime(currentTime);
       updateWorkflowMonitor(workflowContext, workflowConfig);
     }
@@ -523,8 +741,7 @@ public abstract class AbstractTaskDispatcher {
   protected void scheduleJobCleanUp(JobConfig jobConfig, WorkflowConfig workflowConfig,
       long currentTime) {
     long currentScheduledTime =
-        _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()) == -1
-            ? Long.MAX_VALUE
+        _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId()) == -1 ? Long.MAX_VALUE
             : _rebalanceScheduler.getRebalanceTime(workflowConfig.getWorkflowId());
     if (currentTime + jobConfig.getExpiry() < currentScheduledTime) {
       _rebalanceScheduler.scheduleRebalance(_manager, workflowConfig.getWorkflowId(),
@@ -532,25 +749,21 @@ public abstract class AbstractTaskDispatcher {
     }
   }
 
-
-
-
   // Workflow related methods
 
   /**
    * Checks if the workflow has finished (either completed or failed).
    * Set the state in workflow context properly.
-   *
    * @param ctx Workflow context containing job states
    * @param cfg Workflow config containing set of jobs
    * @return returns true if the workflow
-   *            1. completed (all tasks are {@link TaskState#COMPLETED})
-   *            2. failed (any task is {@link TaskState#FAILED}
-   *            3. workflow is {@link TaskState#TIMED_OUT}
+   *         1. completed (all tasks are {@link TaskState#COMPLETED})
+   *         2. failed (any task is {@link TaskState#FAILED}
+   *         3. workflow is {@link TaskState#TIMED_OUT}
    *         returns false otherwise.
    */
   protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg,
-      Map<String, JobConfig> jobConfigMap) {
+      Map<String, JobConfig> jobConfigMap, ClusterDataCache clusterDataCache) {
     boolean incomplete = false;
 
     TaskState workflowState = ctx.getWorkflowState();
@@ -568,13 +781,32 @@ public abstract class AbstractTaskDispatcher {
         failedJobs++;
         if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) {
           ctx.setWorkflowState(TaskState.FAILED);
+          LOG.info("Workflow {} reached the failure threshold, so setting its state to FAILED.", cfg.getWorkflowId());
           for (String jobToFail : cfg.getJobDag().getAllNodes()) {
             if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
               ctx.setJobState(jobToFail, TaskState.ABORTED);
+
               // Skip aborted jobs latency since they are not accurate latency for job running time
               if (_clusterStatusMonitor != null) {
-                _clusterStatusMonitor
-                    .updateJobCounters(jobConfigMap.get(jobToFail), TaskState.ABORTED);
+                _clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobToFail),
+                    TaskState.ABORTED);
+              }
+
+              // Since the job is aborted, release resources occupied by it
+              // Otherwise, we run the risk of resource leak
+              if (clusterDataCache != null) {
+                Iterable<AssignableInstance> assignableInstances = clusterDataCache
+                    .getAssignableInstanceManager().getAssignableInstanceMap().values();
+                JobConfig jobConfig = jobConfigMap.get(jobToFail);
+                String quotaType = jobConfig.getQuotaType();
+                Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+                // Iterate over all tasks and release them
+                for (Map.Entry<String, TaskConfig> taskEntry : taskConfigMap.entrySet()) {
+                  TaskConfig taskConfig = taskEntry.getValue();
+                  for (AssignableInstance assignableInstance : assignableInstances) {
+                    assignableInstance.release(taskConfig, quotaType);
+                  }
+                }
               }
             }
           }
@@ -586,29 +818,25 @@ public abstract class AbstractTaskDispatcher {
         incomplete = true;
       }
     }
-
     if (!incomplete && cfg.isTerminable()) {
       ctx.setWorkflowState(TaskState.COMPLETED);
       return true;
     }
-
     return false;
   }
 
-  protected void updateWorkflowMonitor(WorkflowContext context,
-      WorkflowConfig config) {
+  protected void updateWorkflowMonitor(WorkflowContext context, WorkflowConfig config) {
     if (_clusterStatusMonitor != null) {
       _clusterStatusMonitor.updateWorkflowCounters(config, context.getWorkflowState(),
           context.getFinishTime() - context.getStartTime());
     }
   }
 
-
   // Common methods
 
   protected Set<String> getExcludedInstances(String currentJobName, WorkflowConfig workflowCfg,
-      ClusterDataCache cache) {
-    Set<String> ret = new HashSet<String>();
+      WorkflowContext workflowContext, ClusterDataCache cache) {
+    Set<String> ret = new HashSet<>();
 
     if (!workflowCfg.isAllowOverlapJobAssignment()) {
       // exclude all instances that has been assigned other jobs' tasks
@@ -620,6 +848,14 @@ public abstract class AbstractTaskDispatcher {
         if (jobContext == null) {
           continue;
         }
+        // Also skip if the job is not currently running
+        // For example, if the job here is in a terminal state (such as ABORTED), then its tasks are
+        // practically not running, so we do not need to exclude instances who have tasks from dead
+        // jobs
+        TaskState jobState = workflowContext.getJobState(jobName);
+        if (jobState != TaskState.IN_PROGRESS) {
+          continue;
+        }
         for (int pId : jobContext.getPartitionSet()) {
           TaskPartitionState partitionState = jobContext.getPartitionState(pId);
           if (partitionState == TaskPartitionState.INIT
@@ -634,44 +870,59 @@ public abstract class AbstractTaskDispatcher {
 
   /**
    * Schedule the rebalancer timer for task framework elements
-   * @param resourceId       The resource id
-   * @param startTime        The resource start time
-   * @param timeoutPeriod    The resource timeout period. Will be -1 if it is not set.
+   * @param resourceId The resource id
+   * @param startTime The resource start time
+   * @param timeoutPeriod The resource timeout period. Will be -1 if it is not set.
    */
   protected void scheduleRebalanceForTimeout(String resourceId, long startTime,
       long timeoutPeriod) {
     long nextTimeout = getTimeoutTime(startTime, timeoutPeriod);
     long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(resourceId);
-    if (nextTimeout >= System.currentTimeMillis() && (
-        nextRebalanceTime == TaskConstants.DEFAULT_NEVER_TIMEOUT
-            || nextTimeout < nextRebalanceTime)) {
+    if (nextTimeout >= System.currentTimeMillis()
+        && (nextRebalanceTime == TaskConstants.DEFAULT_NEVER_TIMEOUT
+        || nextTimeout < nextRebalanceTime)) {
       _rebalanceScheduler.scheduleRebalance(_manager, resourceId, nextTimeout);
     }
   }
 
   /**
    * Basic function to check task framework resources, workflow and job, are timeout
-   * @param startTime       Resources start time
-   * @param timeoutPeriod   Resources timeout period. Will be -1 if it is not set.
+   * @param startTime Resources start time
+   * @param timeoutPeriod Resources timeout period. Will be -1 if it is not set.
    * @return
    */
   protected boolean isTimeout(long startTime, long timeoutPeriod) {
     long nextTimeout = getTimeoutTime(startTime, timeoutPeriod);
-    return nextTimeout != TaskConstants.DEFAULT_NEVER_TIMEOUT && nextTimeout <= System
-        .currentTimeMillis();
+    return nextTimeout != TaskConstants.DEFAULT_NEVER_TIMEOUT
+        && nextTimeout <= System.currentTimeMillis();
   }
 
   private long getTimeoutTime(long startTime, long timeoutPeriod) {
     return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT
         || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow
-        ? TaskConstants.DEFAULT_NEVER_TIMEOUT : startTime + timeoutPeriod;
+        ? TaskConstants.DEFAULT_NEVER_TIMEOUT
+        : startTime + timeoutPeriod;
   }
 
-
   /**
    * Set the ClusterStatusMonitor for metrics update
    */
   public void setClusterStatusMonitor(ClusterStatusMonitor clusterStatusMonitor) {
     _clusterStatusMonitor = clusterStatusMonitor;
   }
-}
+
+  /**
+   * Returns an appropriate TaskId depending on whether the job is targeted or not.
+   * @param jobCfg
+   * @param jobCtx
+   * @param partitionNum
+   * @return
+   */
+  private String getTaskId(JobConfig jobCfg, JobContext jobCtx, int partitionNum) {
+    if (TaskUtil.isGenericTaskJob(jobCfg)) {
+      return jobCtx.getTaskIdForPartition(partitionNum);
+    }
+    // This is a targeted task
+    return pName(jobCfg.getJobId(), partitionNum);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index f2f0129..701e444 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -66,6 +66,8 @@ public class AssignableInstanceManager {
     // Only need to build from scratch during Controller switch, etc.
     // This keeps the pipeline from building from scratch every cache refresh
     if (_hasBeenBuilt) {
+      // If it has been already built, just update (configs and LiveInstance changes may be present)
+      updateAssignableInstances(clusterConfig, liveInstances, instanceConfigs);
       return;
     }
 
@@ -186,12 +188,14 @@ public class AssignableInstanceManager {
       // Remove all tasks on this instance first
       for (String taskToRemove : instanceToBeRemoved.getCurrentAssignments()) {
         // Check that AssignableInstances match
-        if (_taskAssignResultMap.get(taskToRemove).getAssignableInstance().getInstanceName()
-            .equals(instanceToBeRemoved.getInstanceName())) {
-          _taskAssignResultMap.remove(taskToRemove); // TODO: Hunter: Move this if necessary
-          LOG.info(
-              "TaskAssignResult removed because its assigned instance is no longer live. TaskID: {}, instance: {}",
-              taskToRemove, instanceToBeRemoved.getInstanceName());
+        if (_taskAssignResultMap.containsKey(taskToRemove)) {
+          if (_taskAssignResultMap.get(taskToRemove).getAssignableInstance().getInstanceName()
+              .equals(instanceToBeRemoved.getInstanceName())) {
+            _taskAssignResultMap.remove(taskToRemove); // TODO: Hunter: Move this if necessary
+            LOG.info(
+                "TaskAssignResult removed because its assigned instance is no longer live. TaskID: {}, instance: {}",
+                taskToRemove, instanceToBeRemoved.getInstanceName());
+          }
         }
       }
       _assignableInstanceMap.remove(instanceToBeRemoved.getInstanceName());

http://git-wip-us.apache.org/repos/asf/helix/blob/4db61b56/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 0ce6012..e7dd959 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -22,6 +22,7 @@ package org.apache.helix.task;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,6 +37,8 @@ import org.apache.helix.model.ResourceAssignment;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.helix.task.assigner.AssignableInstance;
+import org.apache.helix.task.assigner.TaskAssignResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +51,24 @@ import org.slf4j.LoggerFactory;
 public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator {
   private static final Logger LOG =
       LoggerFactory.getLogger(FixedTargetTaskAssignmentCalculator.class);
+  private AssignableInstanceManager _assignableInstanceManager;
+
+  /**
+   * Default constructor. Because of quota-based scheduling support, we need
+   * AssignableInstanceManager. This constructor should not be used.
+   */
+  @Deprecated
+  public FixedTargetTaskAssignmentCalculator() {
+  }
+
+  /**
+   * Constructor for FixedTargetTaskAssignmentCalculator. Requires AssignableInstanceManager for
+   * "charging" resources per task.
+   * @param assignableInstanceManager
+   */
+  public FixedTargetTaskAssignmentCalculator(AssignableInstanceManager assignableInstanceManager) {
+    _assignableInstanceManager = assignableInstanceManager;
+  }
 
   @Override
   public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
@@ -62,14 +83,8 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
       ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
       JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
-    IdealState tgtIs = idealStateMap.get(jobCfg.getTargetResource());
-    if (tgtIs == null) {
-      LOG.warn("Missing target resource for the scheduled job!");
-      return Collections.emptyMap();
-    }
-    Set<String> tgtStates = jobCfg.getTargetPartitionStates();
-    return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet,
-        jobContext);
+    return computeAssignmentAndChargeResource(currStateOutput, prevAssignment, instances, jobCfg,
+        jobContext, partitionSet, idealStateMap);
   }
 
   /**
@@ -112,6 +127,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
   }
 
   /**
+   * NOTE: this method has been deprecated due to the addition of quota-based task scheduling.
    * Get partition assignments for the target resource, but only for the partitions of interest.
    * @param currStateOutput The current state of the instances in the cluster.
    * @param instances The instances.
@@ -122,6 +138,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
    * @param includeSet The set of partitions to consider.
    * @return A map of instance vs set of partition ids assigned to that instance.
    */
+  @Deprecated
   private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
       CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
       Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
@@ -153,7 +170,151 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
         }
       }
     }
+    return result;
+  }
+
+  /**
+   * Calculate the assignment for given tasks. This assignment also charges resources for each task
+   * and takes resource/quota availability into account while assigning.
+   * @param currStateOutput
+   * @param prevAssignment
+   * @param liveInstances
+   * @param jobCfg
+   * @param jobContext
+   * @param taskPartitionSet
+   * @param idealStateMap
+   * @return instance -> set of task partition numbers
+   */
+  private Map<String, SortedSet<Integer>> computeAssignmentAndChargeResource(
+      CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
+      Collection<String> liveInstances, JobConfig jobCfg, JobContext jobContext,
+      Set<Integer> taskPartitionSet, Map<String, IdealState> idealStateMap) {
+
+    // Note: targeted jobs also take up capacity in quota-based scheduling
+    // "Charge" resources for the tasks
+    Map<String, AssignableInstance> assignableInstanceMap =
+        _assignableInstanceManager.getAssignableInstanceMap();
+    String quotaType = jobCfg.getQuotaType();
+    if (quotaType == null || quotaType.equals("") || quotaType.equals("null")) {
+      quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
+    }
+
+    // IdealState of the target resource
+    IdealState targetIdealState = idealStateMap.get(jobCfg.getTargetResource());
+    if (targetIdealState == null) {
+      LOG.warn("Missing target resource for the scheduled job!");
+      return Collections.emptyMap();
+    }
+
+    // The "states" you want to assign to. Assign to the partitions of the target resource if these
+    // partitions are in one of these states
+    Set<String> targetStates = jobCfg.getTargetPartitionStates();
+
+    Map<String, SortedSet<Integer>> result = new HashMap<>();
+    for (String instance : liveInstances) {
+      result.put(instance, new TreeSet<Integer>());
+    }
+
+    // <Target resource partition name -> list of task partition numbers> mapping
+    Map<String, List<Integer>> partitionsByTarget = jobContext.getPartitionsByTarget();
+    for (String targetResourcePartitionName : targetIdealState.getPartitionSet()) {
+      // Get all task partition numbers to be assigned to this targetResource partition
+      List<Integer> taskPartitions = partitionsByTarget.get(targetResourcePartitionName);
+      if (taskPartitions == null || taskPartitions.size() < 1) {
+        continue; // No tasks to assign, skip
+      }
+      // Get one task to be assigned to this targetResource partition
+      int targetPartitionId = taskPartitions.get(0);
+      // First, see if that task needs to be assigned at this time
+      if (taskPartitionSet.contains(targetPartitionId)) {
+        for (String instance : liveInstances) {
+          // See if there is a pending message on this instance on this partition for the target
+          // resource
+          // If there is, we should wait until the pending message gets processed, so skip
+          // assignment this time around
+          Message pendingMessage =
+              currStateOutput.getPendingState(targetIdealState.getResourceName(),
+                  new Partition(targetResourcePartitionName), instance);
+          if (pendingMessage != null) {
+            continue;
+          }
+
+          // See if there a partition exists on this instance
+          String currentState = currStateOutput.getCurrentState(targetIdealState.getResourceName(),
+              new Partition(targetResourcePartitionName), instance);
+          if (currentState != null
+              && (targetStates == null || targetStates.contains(currentState))) {
+
+            // Prepare pName and taskConfig for assignment
+            String pName = String.format("%s_%s", jobCfg.getJobId(), targetPartitionId);
+            if (!jobCfg.getTaskConfigMap().containsKey(pName)) {
+              jobCfg.getTaskConfigMap().put(pName,
+                  new TaskConfig(null, null, pName, targetResourcePartitionName));
+            }
+            TaskConfig taskConfig = jobCfg.getTaskConfigMap().get(pName);
 
+            // On LiveInstance change, RUNNING or other non-terminal tasks might get re-assigned. If
+            // the new assignment differs from prevAssignment, release. If the assigned instances
+            // from old and new assignments are the same, then do nothing and let it keep running
+            // The following checks if two assignments (old and new) differ
+            Map<String, String> instanceMap = prevAssignment.getReplicaMap(new Partition(pName));
+            Iterator<String> itr = instanceMap.keySet().iterator();
+            // First, check if this taskPartition has been ever assigned before by checking
+            // prevAssignment
+            if (itr.hasNext()) {
+              String prevInstance = itr.next();
+              if (!prevInstance.equals(instance)) {
+                // Old and new assignments are different. We need to release from prevInstance, and
+                // this task will be assigned to a different instance
+                if (assignableInstanceMap.containsKey(prevInstance)) {
+                  assignableInstanceMap.get(prevInstance).release(taskConfig, quotaType);
+                } else {
+                  // This instance must be no longer live
+                  LOG.warn(
+                      "Task {} was reassigned from old instance: {} to new instance: {}. However, old instance: {} is not found in AssignableInstanceMap. The old instance is possibly no longer a LiveInstance. This task will not be released.",
+                      pName, prevAssignment, instance);
+                }
+              } else {
+                // Old and new assignments are the same, so just skip assignment for this
+                // taskPartition so that it can just keep running
+                break;
+              }
+            }
+
+            // Actual assignment logic: try to charge resources first and assign if successful
+            if (assignableInstanceMap.containsKey(instance)) {
+              AssignableInstance assignableInstance = assignableInstanceMap.get(instance);
+              // Try to assign first
+              TaskAssignResult taskAssignResult =
+                  assignableInstance.tryAssign(taskConfig, quotaType);
+              if (taskAssignResult.isSuccessful()) {
+                // There exists a partition, the states match up, and tryAssign successful. Assign!
+                assignableInstance.assign(taskAssignResult);
+                result.get(instance).add(targetPartitionId);
+                // To prevent double assign of the tasks on other replicas of the targetResource
+                // partition
+                break;
+              } else if ((!taskAssignResult.isSuccessful() && taskAssignResult
+                  .getFailureReason() == TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED)) {
+                // In case of double assign, we can still include it in the assignment because
+                // RUNNING->RUNNING message will just be ignored by the participant
+                // AssignableInstance should already have it assigned, so do not double-charge
+                result.get(instance).add(targetPartitionId);
+                break;
+              } else {
+                LOG.warn(
+                    "Unable to assign the task to this AssignableInstance. Skipping this instance. Task: {}, Instance: {}, TaskAssignResult: {}",
+                    pName, instance, taskAssignResult);
+              }
+            } else {
+              LOG.error(
+                  "AssignableInstance does not exist for this LiveInstance: {}. This should never happen! Will not assign to this instance.",
+                  instance);
+            }
+          }
+        }
+      }
+    }
     return result;
   }
 }
\ No newline at end of file